72 lines
2.5 KiB
Python
72 lines
2.5 KiB
Python
import asyncio
|
|
import io
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
from app.clients.storage.base import StorageClient
|
|
from app.core.exceptions import StorageError
|
|
from app.core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class RustFSClient(StorageClient):
|
|
def __init__(self, endpoint: str, access_key: str, secret_key: str) -> None:
|
|
self._s3 = boto3.client(
|
|
"s3",
|
|
endpoint_url=endpoint,
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
)
|
|
|
|
async def download_bytes(self, bucket: str, path: str) -> bytes:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
resp = await loop.run_in_executor(
|
|
None, lambda: self._s3.get_object(Bucket=bucket, Key=path)
|
|
)
|
|
return resp["Body"].read()
|
|
except ClientError as exc:
|
|
print(exc)
|
|
raise StorageError(f"存储下载失败 [{bucket}/{path}]: {exc}") from exc
|
|
|
|
async def upload_bytes(
|
|
self, bucket: str, path: str, data: bytes, content_type: str = "application/octet-stream"
|
|
) -> None:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
await loop.run_in_executor(
|
|
None,
|
|
lambda: self._s3.put_object(
|
|
Bucket=bucket, Key=path, Body=io.BytesIO(data), ContentType=content_type
|
|
),
|
|
)
|
|
except ClientError as exc:
|
|
raise StorageError(f"存储上传失败 [{bucket}/{path}]: {exc}") from exc
|
|
|
|
async def get_presigned_url(self, bucket: str, path: str, expires: int = 3600) -> str:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
url = await loop.run_in_executor(
|
|
None,
|
|
lambda: self._s3.generate_presigned_url(
|
|
"get_object",
|
|
Params={"Bucket": bucket, "Key": path},
|
|
ExpiresIn=expires,
|
|
),
|
|
)
|
|
return url
|
|
except ClientError as exc:
|
|
raise StorageError(f"生成预签名 URL 失败 [{bucket}/{path}]: {exc}") from exc
|
|
|
|
async def get_object_size(self, bucket: str, path: str) -> int:
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
resp = await loop.run_in_executor(
|
|
None, lambda: self._s3.head_object(Bucket=bucket, Key=path)
|
|
)
|
|
return resp["ContentLength"]
|
|
except ClientError as exc:
|
|
raise StorageError(f"获取文件大小失败 [{bucket}/{path}]: {exc}") from exc
|