From 94cb27e95f10790251e76570bb052583ed67028e Mon Sep 17 00:00:00 2001 From: wh Date: Thu, 9 Apr 2026 13:16:53 +0800 Subject: [PATCH] =?UTF-8?q?feat(common):=20=E6=B7=BB=E5=8A=A0=20RustFsClie?= =?UTF-8?q?nt=20=E5=92=8C=20AiServiceClient=20(T018/T019)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/label/common/ai/AiServiceClient.java | 149 ++++++++++++++++++ .../label/common/storage/RustFsClient.java | 118 ++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 src/main/java/com/label/common/ai/AiServiceClient.java create mode 100644 src/main/java/com/label/common/storage/RustFsClient.java diff --git a/src/main/java/com/label/common/ai/AiServiceClient.java b/src/main/java/com/label/common/ai/AiServiceClient.java new file mode 100644 index 0000000..93da8f9 --- /dev/null +++ b/src/main/java/com/label/common/ai/AiServiceClient.java @@ -0,0 +1,149 @@ +package com.label.common.ai; + +import lombok.Builder; +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClient; + +import jakarta.annotation.PostConstruct; +import java.util.List; +import java.util.Map; + +@Component +public class AiServiceClient { + + @Value("${ai-service.base-url}") + private String baseUrl; + + @Value("${ai-service.timeout:30000}") + private int timeoutMs; + + private RestClient restClient; + + @PostConstruct + public void init() { + restClient = RestClient.builder() + .baseUrl(baseUrl) + .build(); + } + + // DTO classes + + @Data + @Builder + public static class ExtractionRequest { + private Long sourceId; + private String filePath; + private String bucket; + private String model; + private String prompt; + } + + @Data + public static class ExtractionResponse { + private List> items; // triple/quadruple items + private String rawOutput; + } + + @Data + @Builder + public static class VideoProcessRequest { + private Long sourceId; + private String filePath; + private String bucket; + private Map params; // frameInterval, mode etc. + } + + @Data + public static class QaGenResponse { + private List> qaPairs; + } + + @Data + @Builder + public static class FinetuneRequest { + private String datasetPath; // RustFS path to JSONL file + private String model; + private Long batchId; + } + + @Data + public static class FinetuneResponse { + private String jobId; + private String status; + } + + @Data + public static class FinetuneStatusResponse { + private String jobId; + private String status; // PENDING/RUNNING/COMPLETED/FAILED + private Integer progress; // 0-100 + private String errorMessage; + } + + // The 8 endpoints: + + public ExtractionResponse extractText(ExtractionRequest request) { + return restClient.post() + .uri("/extract/text") + .body(request) + .retrieve() + .body(ExtractionResponse.class); + } + + public ExtractionResponse extractImage(ExtractionRequest request) { + return restClient.post() + .uri("/extract/image") + .body(request) + .retrieve() + .body(ExtractionResponse.class); + } + + public void extractFrames(VideoProcessRequest request) { + restClient.post() + .uri("/video/extract-frames") + .body(request) + .retrieve() + .toBodilessEntity(); + } + + public void videoToText(VideoProcessRequest request) { + restClient.post() + .uri("/video/to-text") + .body(request) + .retrieve() + .toBodilessEntity(); + } + + public QaGenResponse genTextQa(ExtractionRequest request) { + return restClient.post() + .uri("/qa/gen-text") + .body(request) + .retrieve() + .body(QaGenResponse.class); + } + + public QaGenResponse genImageQa(ExtractionRequest request) { + return restClient.post() + .uri("/qa/gen-image") + .body(request) + .retrieve() + .body(QaGenResponse.class); + } + + public FinetuneResponse startFinetune(FinetuneRequest request) { + return restClient.post() + .uri("/finetune/start") + .body(request) + .retrieve() + .body(FinetuneResponse.class); + } + + public FinetuneStatusResponse getFinetuneStatus(String jobId) { + return restClient.get() + .uri("/finetune/status/{jobId}", jobId) + .retrieve() + .body(FinetuneStatusResponse.class); + } +} diff --git a/src/main/java/com/label/common/storage/RustFsClient.java b/src/main/java/com/label/common/storage/RustFsClient.java new file mode 100644 index 0000000..b9bd1ae --- /dev/null +++ b/src/main/java/com/label/common/storage/RustFsClient.java @@ -0,0 +1,118 @@ +package com.label.common.storage; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.presigner.S3Presigner; +import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest; + +import jakarta.annotation.PostConstruct; +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; + +@Slf4j +@Component +public class RustFsClient { + + @Value("${rustfs.endpoint}") + private String endpoint; + + @Value("${rustfs.access-key}") + private String accessKey; + + @Value("${rustfs.secret-key}") + private String secretKey; + + private S3Client s3Client; + private S3Presigner presigner; + + @PostConstruct + public void init() { + var credentials = StaticCredentialsProvider.create( + AwsBasicCredentials.create(accessKey, secretKey)); + + s3Client = S3Client.builder() + .endpointOverride(URI.create(endpoint)) + .credentialsProvider(credentials) + .region(Region.US_EAST_1) + .forcePathStyle(true) // Required for MinIO/RustFS + .build(); + + presigner = S3Presigner.builder() + .endpointOverride(URI.create(endpoint)) + .credentialsProvider(credentials) + .region(Region.US_EAST_1) + .build(); + } + + /** + * Upload file to RustFS. + * @param bucket bucket name + * @param key object key (path) + * @param inputStream file content + * @param contentLength file size in bytes + * @param contentType MIME type + */ + public void upload(String bucket, String key, InputStream inputStream, + long contentLength, String contentType) { + // Ensure bucket exists + ensureBucketExists(bucket); + + s3Client.putObject( + PutObjectRequest.builder() + .bucket(bucket) + .key(key) + .contentType(contentType) + .contentLength(contentLength) + .build(), + RequestBody.fromInputStream(inputStream, contentLength) + ); + } + + /** + * Download file from RustFS. + */ + public InputStream download(String bucket, String key) { + return s3Client.getObject( + GetObjectRequest.builder().bucket(bucket).key(key).build() + ); + } + + /** + * Delete file from RustFS. + */ + public void delete(String bucket, String key) { + s3Client.deleteObject( + DeleteObjectRequest.builder().bucket(bucket).key(key).build() + ); + } + + /** + * Generate a presigned URL for temporary read access. + * @param expirationMinutes URL validity in minutes + */ + public String getPresignedUrl(String bucket, String key, int expirationMinutes) { + var presignRequest = GetObjectPresignRequest.builder() + .signatureDuration(Duration.ofMinutes(expirationMinutes)) + .getObjectRequest(GetObjectRequest.builder().bucket(bucket).key(key).build()) + .build(); + + return presigner.presignGetObject(presignRequest).url().toString(); + } + + private void ensureBucketExists(String bucket) { + try { + s3Client.headBucket(HeadBucketRequest.builder().bucket(bucket).build()); + } catch (NoSuchBucketException e) { + s3Client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()); + log.info("Created bucket: {}", bucket); + } + } +}