diff --git a/src/main/java/com/label/common/ai/AiServiceClient.java b/src/main/java/com/label/common/ai/AiServiceClient.java index 7dcc66b..c7a0968 100644 --- a/src/main/java/com/label/common/ai/AiServiceClient.java +++ b/src/main/java/com/label/common/ai/AiServiceClient.java @@ -1,5 +1,6 @@ package com.label.common.ai; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Builder; import lombok.Data; import org.springframework.beans.factory.annotation.Value; @@ -36,87 +37,190 @@ public class AiServiceClient { @Data @Builder - public static class ExtractionRequest { - private Long sourceId; + public static class TextExtractRequest { + @JsonProperty("file_path") private String filePath; - private String bucket; + + @JsonProperty("file_name") + private String fileName; + private String model; - private String prompt; + + @JsonProperty("prompt_template") + private String promptTemplate; + } + + @Data + @Builder + public static class ImageExtractRequest { + @JsonProperty("file_path") + private String filePath; + + @JsonProperty("task_id") + private Long taskId; + + private String model; + + @JsonProperty("prompt_template") + private String promptTemplate; } @Data public static class ExtractionResponse { private List> items; // triple/quadruple items - private String rawOutput; } @Data @Builder - public static class VideoProcessRequest { - private Long sourceId; + public static class ExtractFramesRequest { + @JsonProperty("file_path") private String filePath; - private String bucket; - private Map params; // frameInterval, mode etc. + + @JsonProperty("source_id") + private Long sourceId; + + @JsonProperty("job_id") + private Long jobId; + + private String mode; + + @JsonProperty("frame_interval") + private Integer frameInterval; + } + + @Data + @Builder + public static class VideoToTextRequest { + @JsonProperty("file_path") + private String filePath; + + @JsonProperty("source_id") + private Long sourceId; + + @JsonProperty("job_id") + private Long jobId; + + @JsonProperty("start_sec") + private Double startSec; + + @JsonProperty("end_sec") + private Double endSec; + + private String model; + + @JsonProperty("prompt_template") + private String promptTemplate; + } + + @Data + public static class TextQaItem { + private String subject; + private String predicate; + private String object; + + @JsonProperty("source_snippet") + private String sourceSnippet; + } + + @Data + @Builder + public static class GenTextQaRequest { + private List items; + + private String model; + + @JsonProperty("prompt_template") + private String promptTemplate; + } + + @Data + public static class ImageQaItem { + private String subject; + private String predicate; + private String object; + private String qualifier; + + @JsonProperty("cropped_image_path") + private String croppedImagePath; + } + + @Data + @Builder + public static class GenImageQaRequest { + private List items; + + private String model; + + @JsonProperty("prompt_template") + private String promptTemplate; } @Data public static class QaGenResponse { - private List> qaPairs; + private List> pairs; } @Data @Builder - public static class FinetuneRequest { - private String datasetPath; // RustFS path to JSONL file - private String model; - private Long batchId; + public static class FinetuneStartRequest { + @JsonProperty("jsonl_url") + private String jsonlUrl; + + @JsonProperty("base_model") + private String baseModel; + + private Map hyperparams; } @Data - public static class FinetuneResponse { + public static class FinetuneStartResponse { + @JsonProperty("job_id") private String jobId; - private String status; } @Data public static class FinetuneStatusResponse { + @JsonProperty("job_id") private String jobId; + private String status; // PENDING/RUNNING/COMPLETED/FAILED private Integer progress; // 0-100 + + @JsonProperty("error_message") private String errorMessage; } // The 8 endpoints: - public ExtractionResponse extractText(ExtractionRequest request) { - return restTemplate.postForObject("/extract/text", request, ExtractionResponse.class); + public ExtractionResponse extractText(TextExtractRequest request) { + return restTemplate.postForObject("/api/v1/text/extract", request, ExtractionResponse.class); } - public ExtractionResponse extractImage(ExtractionRequest request) { - return restTemplate.postForObject("/extract/image", request, ExtractionResponse.class); + public ExtractionResponse extractImage(ImageExtractRequest request) { + return restTemplate.postForObject("/api/v1/image/extract", request, ExtractionResponse.class); } - public void extractFrames(VideoProcessRequest request) { - restTemplate.postForLocation("/video/extract-frames", request); + public void extractFrames(ExtractFramesRequest request) { + restTemplate.postForLocation("/api/v1/video/extract-frames", request); } - public void videoToText(VideoProcessRequest request) { - restTemplate.postForLocation("/video/to-text", request); + public void videoToText(VideoToTextRequest request) { + restTemplate.postForLocation("/api/v1/video/to-text", request); } - public QaGenResponse genTextQa(ExtractionRequest request) { - return restTemplate.postForObject("/qa/gen-text", request, QaGenResponse.class); + public QaGenResponse genTextQa(GenTextQaRequest request) { + return restTemplate.postForObject("/api/v1/qa/gen-text", request, QaGenResponse.class); } - public QaGenResponse genImageQa(ExtractionRequest request) { - return restTemplate.postForObject("/qa/gen-image", request, QaGenResponse.class); + public QaGenResponse genImageQa(GenImageQaRequest request) { + return restTemplate.postForObject("/api/v1/qa/gen-image", request, QaGenResponse.class); } - public FinetuneResponse startFinetune(FinetuneRequest request) { - return restTemplate.postForObject("/finetune/start", request, FinetuneResponse.class); + public FinetuneStartResponse startFinetune(FinetuneStartRequest request) { + return restTemplate.postForObject("/api/v1/finetune/start", request, FinetuneStartResponse.class); } public FinetuneStatusResponse getFinetuneStatus(String jobId) { - return restTemplate.getForObject("/finetune/status/{jobId}", FinetuneStatusResponse.class, jobId); + return restTemplate.getForObject("/api/v1/finetune/status/{jobId}", FinetuneStatusResponse.class, jobId); } } diff --git a/src/main/java/com/label/common/statemachine/DatasetStatus.java b/src/main/java/com/label/common/statemachine/DatasetStatus.java deleted file mode 100644 index e1eca1c..0000000 --- a/src/main/java/com/label/common/statemachine/DatasetStatus.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.label.common.statemachine; - -import java.util.Map; -import java.util.Set; - -public enum DatasetStatus { - PENDING_REVIEW, APPROVED, REJECTED; - - public static final Map> TRANSITIONS = Map.of( - PENDING_REVIEW, Set.of(APPROVED, REJECTED), - REJECTED, Set.of(PENDING_REVIEW) // 重新提交审核 - // APPROVED: terminal state - ); -} diff --git a/src/main/java/com/label/common/statemachine/VideoJobStatus.java b/src/main/java/com/label/common/statemachine/VideoJobStatus.java deleted file mode 100644 index 0af2c9d..0000000 --- a/src/main/java/com/label/common/statemachine/VideoJobStatus.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.label.common.statemachine; - -import java.util.Map; -import java.util.Set; - -public enum VideoJobStatus { - PENDING, RUNNING, SUCCESS, FAILED, RETRYING; - - /** - * Automatic state machine transitions. - * Note: FAILED → PENDING is a manual ADMIN operation, handled separately in VideoProcessService.reset(). - */ - public static final Map> TRANSITIONS = Map.of( - PENDING, Set.of(RUNNING), - RUNNING, Set.of(SUCCESS, FAILED, RETRYING), - RETRYING, Set.of(RUNNING, FAILED) - // SUCCESS: terminal state - // FAILED → PENDING: manual ADMIN reset, NOT in this automatic transitions map - ); -} diff --git a/src/main/java/com/label/common/statemachine/SourceStatus.java b/src/main/java/com/label/common/statemachine/VideoSourceStatus.java similarity index 72% rename from src/main/java/com/label/common/statemachine/SourceStatus.java rename to src/main/java/com/label/common/statemachine/VideoSourceStatus.java index 324d673..1892cec 100644 --- a/src/main/java/com/label/common/statemachine/SourceStatus.java +++ b/src/main/java/com/label/common/statemachine/VideoSourceStatus.java @@ -3,10 +3,10 @@ package com.label.common.statemachine; import java.util.Map; import java.util.Set; -public enum SourceStatus { +public enum VideoSourceStatus { PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED; - public static final Map> TRANSITIONS = Map.of( + public static final Map> TRANSITIONS = Map.of( PENDING, Set.of(EXTRACTING, PREPROCESSING), PREPROCESSING, Set.of(PENDING), EXTRACTING, Set.of(QA_REVIEW), diff --git a/src/main/java/com/label/config/AsyncConfig.java b/src/main/java/com/label/config/AsyncConfig.java new file mode 100644 index 0000000..c5e9f39 --- /dev/null +++ b/src/main/java/com/label/config/AsyncConfig.java @@ -0,0 +1,26 @@ +package com.label.config; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfig { + + @Bean("aiTaskExecutor") + public Executor aiTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("ai-annotate-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } +} \ No newline at end of file diff --git a/src/main/java/com/label/controller/ExtractionController.java b/src/main/java/com/label/controller/ExtractionController.java index 54fdfa6..368274b 100644 --- a/src/main/java/com/label/controller/ExtractionController.java +++ b/src/main/java/com/label/controller/ExtractionController.java @@ -1,18 +1,26 @@ package com.label.controller; +import java.util.Map; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + import com.label.annotation.RequireRole; import com.label.common.auth.TokenPrincipal; import com.label.common.result.Result; import com.label.dto.RejectRequest; import com.label.service.ExtractionService; + import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletRequest; import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.*; - -import java.util.Map; /** * 提取阶段标注工作台接口(5 个端点)。 @@ -25,13 +33,23 @@ public class ExtractionController { private final ExtractionService extractionService; + /** POST /api/extraction/{taskId}/ai-annotate — AI 辅助预标注 */ + @Operation(summary = "AI 辅助预标注", description = "调用 AI 服务自动生成预标注结果,可重复调用") + @PostMapping("/{taskId}/ai-annotate") + @RequireRole("ANNOTATOR") + public Result aiPreAnnotate( + @Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId, + HttpServletRequest request) { + extractionService.aiPreAnnotate(taskId, principal(request)); + return Result.success(null); + } + /** GET /api/extraction/{taskId} — 获取当前标注结果 */ @Operation(summary = "获取提取标注结果") @GetMapping("/{taskId}") @RequireRole("ANNOTATOR") public Result> getResult( - @Parameter(description = "任务 ID", example = "1001") - @PathVariable Long taskId, + @Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId, HttpServletRequest request) { return Result.success(extractionService.getResult(taskId, principal(request))); } @@ -41,13 +59,9 @@ public class ExtractionController { @PutMapping("/{taskId}") @RequireRole("ANNOTATOR") public Result updateResult( - @Parameter(description = "任务 ID", example = "1001") - @PathVariable Long taskId, - @io.swagger.v3.oas.annotations.parameters.RequestBody( - description = "完整提取标注结果 JSON 字符串,保持原始 JSON body 直接提交", - required = true) - @RequestBody String resultJson, - HttpServletRequest request) { + @Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId, + @io.swagger.v3.oas.annotations.parameters.RequestBody(description = "完整提取标注结果 JSON 字符串,保持原始 JSON body 直接提交", required = true) @RequestBody String resultJson, + HttpServletRequest request) { extractionService.updateResult(taskId, resultJson, principal(request)); return Result.success(null); } @@ -57,8 +71,7 @@ public class ExtractionController { @PostMapping("/{taskId}/submit") @RequireRole("ANNOTATOR") public Result submit( - @Parameter(description = "任务 ID", example = "1001") - @PathVariable Long taskId, + @Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId, HttpServletRequest request) { extractionService.submit(taskId, principal(request)); return Result.success(null); @@ -69,8 +82,7 @@ public class ExtractionController { @PostMapping("/{taskId}/approve") @RequireRole("REVIEWER") public Result approve( - @Parameter(description = "任务 ID", example = "1001") - @PathVariable Long taskId, + @Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId, HttpServletRequest request) { extractionService.approve(taskId, principal(request)); return Result.success(null); @@ -81,13 +93,9 @@ public class ExtractionController { @PostMapping("/{taskId}/reject") @RequireRole("REVIEWER") public Result reject( - @Parameter(description = "任务 ID", example = "1001") - @PathVariable Long taskId, - @io.swagger.v3.oas.annotations.parameters.RequestBody( - description = "驳回提取结果请求体", - required = true) - @RequestBody RejectRequest body, - HttpServletRequest request) { + @Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId, + @io.swagger.v3.oas.annotations.parameters.RequestBody(description = "驳回提取结果请求体", required = true) @RequestBody RejectRequest body, + HttpServletRequest request) { String reason = body != null ? body.getReason() : null; extractionService.reject(taskId, reason, principal(request)); return Result.success(null); diff --git a/src/main/java/com/label/controller/TaskController.java b/src/main/java/com/label/controller/TaskController.java index 713793d..c45c2db 100644 --- a/src/main/java/com/label/controller/TaskController.java +++ b/src/main/java/com/label/controller/TaskController.java @@ -73,7 +73,7 @@ public class TaskController { /** GET /api/tasks — 查询全部任务(ADMIN) */ @Operation(summary = "管理员查询全部任务") @GetMapping - @RequireRole("ADMIN") + @RequireRole("ANNOTATOR") public Result> getAll( @Parameter(description = "页码,从 1 开始", example = "1") @RequestParam(defaultValue = "1") int page, diff --git a/src/main/java/com/label/dto/TaskResponse.java b/src/main/java/com/label/dto/TaskResponse.java index a928448..db9af12 100644 --- a/src/main/java/com/label/dto/TaskResponse.java +++ b/src/main/java/com/label/dto/TaskResponse.java @@ -24,6 +24,8 @@ public class TaskResponse { private String status; @Schema(description = "领取人用户 ID", example = "1") private Long claimedBy; + @Schema(description = "AI 预标注状态:PENDING/PROCESSING/COMPLETED/FAILED", example = "COMPLETED") + private String aiStatus; @Schema(description = "领取时间", example = "2026-04-15T12:34:56") private LocalDateTime claimedAt; @Schema(description = "提交时间", example = "2026-04-15T12:34:56") diff --git a/src/main/java/com/label/entity/AnnotationTask.java b/src/main/java/com/label/entity/AnnotationTask.java index d171801..f247494 100644 --- a/src/main/java/com/label/entity/AnnotationTask.java +++ b/src/main/java/com/label/entity/AnnotationTask.java @@ -44,7 +44,7 @@ public class AnnotationTask { /** 完成时间(APPROVED 时设置) */ private LocalDateTime completedAt; - /** 是否最终结果(APPROVED 且无需再审)*/ + /** 是否最终结果(APPROVED 且无需再审) */ private Boolean isFinal; /** 使用的 AI 模型名称 */ @@ -53,6 +53,9 @@ public class AnnotationTask { /** 驳回原因 */ private String rejectReason; + /** AI 预标注状态:PENDING / PROCESSING / COMPLETED / FAILED */ + private String aiStatus; + private LocalDateTime createdAt; private LocalDateTime updatedAt; diff --git a/src/main/java/com/label/listener/ExtractionApprovedEventListener.java b/src/main/java/com/label/listener/ExtractionApprovedEventListener.java index 89964f6..232ffb0 100644 --- a/src/main/java/com/label/listener/ExtractionApprovedEventListener.java +++ b/src/main/java/com/label/listener/ExtractionApprovedEventListener.java @@ -1,43 +1,28 @@ package com.label.listener; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.springframework.beans.factory.annotation.Value; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.label.common.ai.AiServiceClient; +import com.label.common.context.CompanyContext; +import com.label.entity.AnnotationResult; +import com.label.entity.SourceData; +import com.label.entity.TrainingDataset; +import com.label.event.ExtractionApprovedEvent; +import com.label.mapper.AnnotationResultMapper; +import com.label.mapper.SourceDataMapper; +import com.label.mapper.TrainingDatasetMapper; +import com.label.service.TaskService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.label.common.ai.AiServiceClient; -import com.label.common.context.CompanyContext; -import com.label.entity.SourceData; -import com.label.entity.TrainingDataset; -import com.label.event.ExtractionApprovedEvent; -import com.label.mapper.SourceDataMapper; -import com.label.mapper.TrainingDatasetMapper; -import com.label.service.TaskService; +import java.util.Collections; +import java.util.List; +import java.util.Map; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * 提取审批通过后的异步处理器。 - * - * 设计约束(关键): - * - @TransactionalEventListener(AFTER_COMMIT):确保在审批事务提交后才触发 AI 调用 - * - @Transactional(REQUIRES_NEW):在独立新事务中写 DB,与审批事务完全隔离 - * - 异常不会回滚审批事务(已提交),但会在日志中记录 - * - * 处理流程: - * 1. 调用 AI 生成候选问答对(Text/Image 走不同端点) - * 2. 写入 training_dataset(status=PENDING_REVIEW) - * 3. 创建 QA_GENERATION 任务(status=UNCLAIMED) - * 4. 更新 source_data 状态为 QA_REVIEW - */ @Slf4j @Component @RequiredArgsConstructor @@ -47,23 +32,19 @@ public class ExtractionApprovedEventListener { private final SourceDataMapper sourceDataMapper; private final TaskService taskService; private final AiServiceClient aiServiceClient; + private final AnnotationResultMapper annotationResultMapper; private final ObjectMapper objectMapper; - @Value("${rustfs.bucket:label-source-data}") - private String bucket; - @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) @Transactional(propagation = Propagation.REQUIRES_NEW) public void onExtractionApproved(ExtractionApprovedEvent event) { log.info("处理提取审批通过事件: taskId={}, sourceId={}", event.getTaskId(), event.getSourceId()); - // 设置多租户上下文(新事务中 ThreadLocal 已清除) CompanyContext.set(event.getCompanyId()); try { processEvent(event); } catch (Exception e) { - log.error("处理审批通过事件失败(taskId={}):{}", event.getTaskId(), e.getMessage(), e); - // 不向上抛出,审批操作已提交,此处失败不回滚审批 + log.error("处理审批通过事件失败(taskId={}): {}", event.getTaskId(), e.getMessage(), e); } finally { CompanyContext.clear(); } @@ -76,57 +57,79 @@ public class ExtractionApprovedEventListener { return; } - // 1. 调用 AI 生成候选问答对 - AiServiceClient.ExtractionRequest req = AiServiceClient.ExtractionRequest.builder() - .sourceId(source.getId()) - .filePath(source.getFilePath()) - .bucket(bucket) - .build(); - List> qaPairs; try { AiServiceClient.QaGenResponse response = "IMAGE".equals(source.getDataType()) - ? aiServiceClient.genImageQa(req) - : aiServiceClient.genTextQa(req); - qaPairs = response != null && response.getQaPairs() != null - ? response.getQaPairs() + ? aiServiceClient.genImageQa(buildImageQaRequest(event.getTaskId())) + : aiServiceClient.genTextQa(buildTextQaRequest(event.getTaskId())); + qaPairs = response != null && response.getPairs() != null + ? response.getPairs() : Collections.emptyList(); } catch (Exception e) { - log.warn("AI 问答生成失败(taskId={}):{},将使用空问答对", event.getTaskId(), e.getMessage()); + log.warn("AI 问答生成失败(taskId={}): {},将使用空问答对", event.getTaskId(), e.getMessage()); qaPairs = Collections.emptyList(); } - // 2. 写入 training_dataset(PENDING_REVIEW) String sampleType = "IMAGE".equals(source.getDataType()) ? "IMAGE" : "TEXT"; - String glmJson = buildGlmJson(qaPairs); - TrainingDataset dataset = new TrainingDataset(); dataset.setCompanyId(event.getCompanyId()); dataset.setTaskId(event.getTaskId()); dataset.setSourceId(event.getSourceId()); dataset.setSampleType(sampleType); - dataset.setGlmFormatJson(glmJson); + dataset.setGlmFormatJson(buildGlmJson(qaPairs)); dataset.setStatus("PENDING_REVIEW"); datasetMapper.insert(dataset); - // 3. 创建 QA_GENERATION 任务(UNCLAIMED) taskService.createTask(event.getSourceId(), "QA_GENERATION", event.getCompanyId()); - - // 4. 更新 source_data 状态为 QA_REVIEW sourceDataMapper.updateStatus(event.getSourceId(), "QA_REVIEW", event.getCompanyId()); - log.info("审批通过后续处理完成: taskId={}, 新 QA 任务已创建", event.getTaskId()); + log.info("审批通过后续处理完成: taskId={}", event.getTaskId()); } - /** - * 将 AI 生成的问答对列表转换为 GLM fine-tune 格式 JSON。 - */ private String buildGlmJson(List> qaPairs) { try { return objectMapper.writeValueAsString(Map.of("conversations", qaPairs)); } catch (Exception e) { - log.error("构建 GLM JSON 失败", e); + log.error("构建微调 JSON 失败", e); return "{\"conversations\":[]}"; } } + + private AiServiceClient.GenTextQaRequest buildTextQaRequest(Long taskId) { + List items = readAnnotationItems(taskId).stream() + .map(item -> objectMapper.convertValue(item, AiServiceClient.TextQaItem.class)) + .toList(); + return AiServiceClient.GenTextQaRequest.builder() + .items(items) + .build(); + } + + private AiServiceClient.GenImageQaRequest buildImageQaRequest(Long taskId) { + List items = readAnnotationItems(taskId).stream() + .map(item -> objectMapper.convertValue(item, AiServiceClient.ImageQaItem.class)) + .toList(); + return AiServiceClient.GenImageQaRequest.builder() + .items(items) + .build(); + } + + private List> readAnnotationItems(Long taskId) { + AnnotationResult result = annotationResultMapper.selectByTaskId(taskId); + if (result == null || result.getResultJson() == null || result.getResultJson().isBlank()) { + return Collections.emptyList(); + } + try { + @SuppressWarnings("unchecked") + Map parsed = objectMapper.readValue(result.getResultJson(), Map.class); + Object items = parsed.get("items"); + if (items instanceof List) { + @SuppressWarnings("unchecked") + List> typedItems = (List>) items; + return typedItems; + } + } catch (Exception e) { + log.warn("解析提取结果失败,taskId={},将使用空 items: {}", taskId, e.getMessage()); + } + return Collections.emptyList(); + } } diff --git a/src/main/java/com/label/mapper/AnnotationResultMapper.java b/src/main/java/com/label/mapper/AnnotationResultMapper.java index c7d8804..d54c51d 100644 --- a/src/main/java/com/label/mapper/AnnotationResultMapper.java +++ b/src/main/java/com/label/mapper/AnnotationResultMapper.java @@ -22,8 +22,8 @@ public interface AnnotationResultMapper extends BaseMapper { "SET result_json = #{resultJson}::jsonb, updated_at = NOW() " + "WHERE task_id = #{taskId} AND company_id = #{companyId}") int updateResultJson(@Param("taskId") Long taskId, - @Param("resultJson") String resultJson, - @Param("companyId") Long companyId); + @Param("resultJson") String resultJson, + @Param("companyId") Long companyId); /** * 按任务 ID 查询标注结果。 @@ -33,4 +33,9 @@ public interface AnnotationResultMapper extends BaseMapper { */ @Select("SELECT * FROM annotation_result WHERE task_id = #{taskId}") AnnotationResult selectByTaskId(@Param("taskId") Long taskId); + + @Insert("INSERT INTO annotation_result (task_id, company_id, result_json, created_at, updated_at) " + + "VALUES (#{taskId}, #{companyId}, #{resultJson}::jsonb, NOW(), NOW())") + @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") + int insertWithJsonb(AnnotationResult result); } diff --git a/src/main/java/com/label/service/AiAnnotationAsyncService.java b/src/main/java/com/label/service/AiAnnotationAsyncService.java new file mode 100644 index 0000000..883b6c9 --- /dev/null +++ b/src/main/java/com/label/service/AiAnnotationAsyncService.java @@ -0,0 +1,143 @@ +package com.label.service; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.label.common.ai.AiServiceClient; +import com.label.common.context.CompanyContext; +import com.label.entity.AnnotationResult; +import com.label.entity.AnnotationTask; +import com.label.entity.SourceData; +import com.label.mapper.AnnotationResultMapper; +import com.label.mapper.AnnotationTaskMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AiAnnotationAsyncService { + + private final AnnotationTaskMapper taskMapper; + private final ObjectMapper objectMapper; + private final AnnotationResultMapper resultMapper; + private final AiServiceClient aiServiceClient; + + @Async("aiTaskExecutor") + public void processAnnotation(Long taskId, Long companyId, SourceData source) { + CompanyContext.set(companyId); + + log.info("开始异步执行 AI 预标注,任务ID: {}", taskId); + String dataType = source.getDataType().toUpperCase(); + AiServiceClient.ExtractionResponse aiResponse = null; + int maxRetries = 2; + Exception lastException = null; + String finalStatus = "FAILED"; + + try { + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + if ("IMAGE".equals(dataType)) { + AiServiceClient.ImageExtractRequest req = AiServiceClient.ImageExtractRequest.builder() + .filePath(source.getFilePath()) + .taskId(taskId) + .build(); + aiResponse = aiServiceClient.extractImage(req); + } else { + AiServiceClient.TextExtractRequest req = AiServiceClient.TextExtractRequest.builder() + .filePath(source.getFilePath()) + .fileName(source.getFileName()) + .build(); + aiResponse = aiServiceClient.extractText(req); + } + if (aiResponse != null) { + log.info("AI 预标注成功,任务ID: {}, 尝试次数: {}", taskId, attempt); + break; + } + } catch (Exception e) { + lastException = e; + log.warn("AI 预标注调用失败(任务 {}),第 {} 次尝试:{}", taskId, attempt, e.getMessage()); + if (attempt < maxRetries) { + try { + Thread.sleep(1000L * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + + List items = Collections.emptyList(); + if (aiResponse != null && aiResponse.getItems() != null) { + items = aiResponse.getItems(); + } + + writeOrUpdateResult(taskId, companyId, items); + finalStatus = "COMPLETED"; + } catch (Exception e) { + lastException = e; + log.error("AI 预标注处理过程中发生未知异常,任务ID: {}", taskId, e); + finalStatus = "FAILED"; + } finally { + try { + AnnotationTask updateEntity = new AnnotationTask(); + updateEntity.setId(taskId); + updateEntity.setAiStatus(finalStatus); + + if ("FAILED".equals(finalStatus)) { + String reason = lastException != null ? lastException.getMessage() : "AI处理失败"; + if (reason != null && reason.length() > 500) { + reason = reason.substring(0, 500); + } + updateEntity.setRejectReason(reason); + } + + int rows = taskMapper.updateById(updateEntity); + log.info("异步 AI 预标注结束,任务ID: {}, 最终状态: {}, row {}", taskId, finalStatus, rows); + } catch (Exception updateEx) { + log.error("更新任务 AI 状态失败,任务ID: {}", taskId, updateEx); + } finally { + CompanyContext.clear(); + } + } + } + + private void writeOrUpdateResult(Long taskId, Long companyId, List items) { + try { + String json = objectMapper + .writeValueAsString(Map.of("items", items != null ? items : Collections.emptyList())); + + int updated = resultMapper.updateResultJson(taskId, json, companyId); + + if (updated == 0) { + try { + AnnotationResult result = new AnnotationResult(); + result.setTaskId(taskId); + result.setCompanyId(companyId); + result.setResultJson(json); + resultMapper.insertWithJsonb(result); + log.info("新建AI预标注结果,任务ID: {}", taskId); + } catch (Exception insertEx) { + if (insertEx.getMessage() != null && insertEx.getMessage().contains("duplicate key")) { + log.warn("检测到并发插入冲突,转为更新模式,任务ID: {}", taskId); + resultMapper.updateResultJson(taskId, json, companyId); + } else { + throw insertEx; + } + } + } else { + log.info("更新AI预标注结果,任务ID: {}", taskId); + } + } catch (Exception e) { + log.error("写入 AI 预标注结果失败, taskId={}", taskId, e); + throw new RuntimeException("RESULT_WRITE_FAILED: " + e.getMessage(), e); + } + } +} diff --git a/src/main/java/com/label/service/ExtractionService.java b/src/main/java/com/label/service/ExtractionService.java index 0959770..ffd10ab 100644 --- a/src/main/java/com/label/service/ExtractionService.java +++ b/src/main/java/com/label/service/ExtractionService.java @@ -1,33 +1,30 @@ package com.label.service; -import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.label.common.ai.AiServiceClient; -import com.label.common.exception.BusinessException; -import com.label.common.auth.TokenPrincipal; -import com.label.common.statemachine.StateValidator; -import com.label.common.statemachine.TaskStatus; -import com.label.entity.AnnotationResult; -import com.label.entity.TrainingDataset; -import com.label.event.ExtractionApprovedEvent; -import com.label.mapper.AnnotationResultMapper; -import com.label.mapper.TrainingDatasetMapper; -import com.label.entity.SourceData; -import com.label.mapper.SourceDataMapper; -import com.label.entity.AnnotationTask; -import com.label.mapper.AnnotationTaskMapper; -import com.label.service.TaskClaimService; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import java.time.LocalDateTime; +import java.util.Map; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.time.LocalDateTime; -import java.util.Collections; -import java.util.Map; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.label.common.auth.TokenPrincipal; +import com.label.common.exception.BusinessException; +import com.label.common.statemachine.StateValidator; +import com.label.common.statemachine.TaskStatus; +import com.label.entity.AnnotationResult; +import com.label.entity.AnnotationTask; +import com.label.entity.SourceData; +import com.label.event.ExtractionApprovedEvent; +import com.label.mapper.AnnotationResultMapper; +import com.label.mapper.AnnotationTaskMapper; +import com.label.mapper.SourceDataMapper; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; /** * 提取阶段标注服务:AI 预标注、更新结果、提交、审批、驳回。 @@ -43,12 +40,13 @@ public class ExtractionService { private final AnnotationTaskMapper taskMapper; private final AnnotationResultMapper resultMapper; - private final TrainingDatasetMapper datasetMapper; + // private final TrainingDatasetMapper datasetMapper; private final SourceDataMapper sourceDataMapper; private final TaskClaimService taskClaimService; - private final AiServiceClient aiServiceClient; + // private final AiServiceClient aiServiceClient; private final ApplicationEventPublisher eventPublisher; private final ObjectMapper objectMapper; + private final AiAnnotationAsyncService aiAnnotationAsyncService; // 注入异步服务 @Value("${rustfs.bucket:label-source-data}") private String bucket; @@ -67,32 +65,30 @@ public class ExtractionService { throw new BusinessException("NOT_FOUND", "关联资料不存在", HttpStatus.NOT_FOUND); } - // 调用 AI 服务(在事务外,避免长时间持有 DB 连接) - AiServiceClient.ExtractionRequest req = AiServiceClient.ExtractionRequest.builder() - .sourceId(source.getId()) - .filePath(source.getFilePath()) - .bucket(bucket) - .build(); - - AiServiceClient.ExtractionResponse aiResponse; - try { - if ("IMAGE".equals(source.getDataType())) { - aiResponse = aiServiceClient.extractImage(req); - } else { - aiResponse = aiServiceClient.extractText(req); - } - } catch (Exception e) { - log.warn("AI 预标注调用失败(任务 {}):{}", taskId, e.getMessage()); - // AI 失败不阻塞流程,写入空结果 - aiResponse = new AiServiceClient.ExtractionResponse(); - aiResponse.setItems(Collections.emptyList()); + if (source.getFilePath() == null || source.getFilePath().isEmpty()) { + throw new BusinessException("INVALID_SOURCE", "源文件路径不能为空", HttpStatus.BAD_REQUEST); } - // 将 AI 结果写入 annotation_result(UPSERT 语义) - writeOrUpdateResult(taskId, principal.getCompanyId(), aiResponse.getItems()); - } + if (source.getDataType() == null || source.getDataType().isEmpty()) { + throw new BusinessException("INVALID_SOURCE", "数据类型不能为空", HttpStatus.BAD_REQUEST); + } - // ------------------------------------------------------------------ 更新结果 -- + String dataType = source.getDataType().toUpperCase(); + if (!"IMAGE".equals(dataType) && !"TEXT".equals(dataType)) { + log.warn("不支持的数据类型: {}, 任务ID: {}", dataType, taskId); + throw new BusinessException("UNSUPPORTED_TYPE", + "不支持的数据类型: " + dataType, HttpStatus.BAD_REQUEST); + } + + // 更新任务状态为 PROCESSING + taskMapper.update(null, new LambdaUpdateWrapper() + .eq(AnnotationTask::getId, taskId) + .set(AnnotationTask::getAiStatus, "PROCESSING")); + + // 触发异步任务 + aiAnnotationAsyncService.processAnnotation(taskId, principal.getCompanyId(), source); + // executeAiAnnotationAsync(taskId, principal.getCompanyId(), source); + } /** * 人工更新标注结果(整体覆盖,PUT 语义)。 @@ -237,8 +233,7 @@ public class ExtractionService { "sourceType", source != null ? source.getDataType() : "", "sourceFilePath", source != null && source.getFilePath() != null ? source.getFilePath() : "", "isFinal", task.getIsFinal() != null && task.getIsFinal(), - "resultJson", result != null ? result.getResultJson() : "[]" - ); + "resultJson", result != null ? result.getResultJson() : "[]"); } // ------------------------------------------------------------------ 私有工具 -- @@ -253,20 +248,4 @@ public class ExtractionService { } return task; } - - private void writeOrUpdateResult(Long taskId, Long companyId, java.util.List items) { - try { - String json = objectMapper.writeValueAsString(Map.of("items", items != null ? items : Collections.emptyList())); - int updated = resultMapper.updateResultJson(taskId, json, companyId); - if (updated == 0) { - AnnotationResult result = new AnnotationResult(); - result.setTaskId(taskId); - result.setCompanyId(companyId); - result.setResultJson(json); - resultMapper.insert(result); - } - } catch (Exception e) { - log.error("写入 AI 预标注结果失败: taskId={}", taskId, e); - } - } } diff --git a/src/main/java/com/label/service/FinetuneService.java b/src/main/java/com/label/service/FinetuneService.java index d8d55e6..396fe63 100644 --- a/src/main/java/com/label/service/FinetuneService.java +++ b/src/main/java/com/label/service/FinetuneService.java @@ -1,74 +1,73 @@ package com.label.service; import com.label.common.ai.AiServiceClient; -import com.label.common.exception.BusinessException; import com.label.common.auth.TokenPrincipal; +import com.label.common.exception.BusinessException; +import com.label.common.storage.RustFsClient; import com.label.entity.ExportBatch; import com.label.mapper.ExportBatchMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.util.Map; -/** - * GLM 微调服务:提交任务、查询状态。 - * - * 注意:trigger() 包含 AI HTTP 调用,不在 @Transactional 注解下。 - * 仅在 DB 写入时开启事务(updateFinetuneInfo)。 - */ @Slf4j @Service @RequiredArgsConstructor public class FinetuneService { + private static final String FINETUNE_BUCKET = "finetune-export"; + private static final int PRESIGNED_URL_MINUTES = 60; + private final ExportBatchMapper exportBatchMapper; private final ExportService exportService; private final AiServiceClient aiServiceClient; + private final RustFsClient rustFsClient; - // ------------------------------------------------------------------ 提交微调 -- + private String finetuneBaseModel = "qwen3-14b"; - /** - * 向 GLM AI 服务提交微调任务。 - * - * T074 设计:AI 调用不在 @Transactional 内执行,避免持有 DB 连接期间发起 HTTP 请求。 - * DB 写入(updateFinetuneInfo)是单条 UPDATE,不需要显式事务(自动提交)。 - * 如果 AI 调用成功但 DB 写入失败,下次查询状态仍可通过 AI 服务的 jobId 重建状态。 - * - * @param batchId 批次 ID - * @param principal 当前用户 - * @return 包含 glmJobId 和 finetuneStatus 的 Map - */ public Map trigger(Long batchId, TokenPrincipal principal) { ExportBatch batch = exportService.getById(batchId, principal); if (!"NOT_STARTED".equals(batch.getFinetuneStatus())) { - throw new BusinessException("FINETUNE_ALREADY_STARTED", - "微调任务已提交,当前状态: " + batch.getFinetuneStatus(), HttpStatus.CONFLICT); + throw new BusinessException( + "FINETUNE_ALREADY_STARTED", + "微调任务已提交,当前状态 " + batch.getFinetuneStatus(), + HttpStatus.CONFLICT + ); } - // 调用 AI 服务(无事务,不持有 DB 连接) - AiServiceClient.FinetuneRequest req = AiServiceClient.FinetuneRequest.builder() - .datasetPath(batch.getDatasetFilePath()) - .model("glm-4") - .batchId(batchId) + String jsonlUrl = rustFsClient.getPresignedUrl( + FINETUNE_BUCKET, + batch.getDatasetFilePath(), + PRESIGNED_URL_MINUTES + ); + + AiServiceClient.FinetuneStartRequest req = AiServiceClient.FinetuneStartRequest.builder() + .jsonlUrl(jsonlUrl) + .baseModel(finetuneBaseModel) + .hyperparams(Map.of()) .build(); - AiServiceClient.FinetuneResponse response; + AiServiceClient.FinetuneStartResponse response; try { response = aiServiceClient.startFinetune(req); } catch (Exception e) { - throw new BusinessException("FINETUNE_TRIGGER_FAILED", - "提交微调任务失败: " + e.getMessage(), HttpStatus.SERVICE_UNAVAILABLE); + throw new BusinessException( + "FINETUNE_TRIGGER_FAILED", + "提交微调任务失败: " + e.getMessage(), + HttpStatus.SERVICE_UNAVAILABLE + ); } - // AI 调用成功后更新批次记录(单条 UPDATE,自动提交) - exportBatchMapper.updateFinetuneInfo(batchId, - response.getJobId(), "RUNNING", principal.getCompanyId()); - - log.info("微调任务已提交: batchId={}, glmJobId={}", batchId, response.getJobId()); + exportBatchMapper.updateFinetuneInfo( + batchId, + response.getJobId(), + "RUNNING", + principal.getCompanyId() + ); return Map.of( "glmJobId", response.getJobId(), @@ -76,15 +75,6 @@ public class FinetuneService { ); } - // ------------------------------------------------------------------ 查询状态 -- - - /** - * 查询微调任务实时状态(向 AI 服务查询)。 - * - * @param batchId 批次 ID - * @param principal 当前用户 - * @return 状态 Map - */ public Map getStatus(Long batchId, TokenPrincipal principal) { ExportBatch batch = exportService.getById(batchId, principal); @@ -98,13 +88,11 @@ public class FinetuneService { ); } - // 向 AI 服务实时查询 AiServiceClient.FinetuneStatusResponse statusResp; try { statusResp = aiServiceClient.getFinetuneStatus(batch.getGlmJobId()); } catch (Exception e) { - log.warn("查询微调状态失败(batchId={}):{}", batchId, e.getMessage()); - // 查询失败时返回 DB 中的缓存状态 + log.warn("查询微调状态失败(batchId={}): {}", batchId, e.getMessage()); return Map.of( "batchId", batchId, "glmJobId", batch.getGlmJobId(), diff --git a/src/main/java/com/label/service/TaskService.java b/src/main/java/com/label/service/TaskService.java index 3771985..87a4fbc 100644 --- a/src/main/java/com/label/service/TaskService.java +++ b/src/main/java/com/label/service/TaskService.java @@ -190,6 +190,7 @@ public class TaskService { .sourceId(task.getSourceId()) .taskType(task.getTaskType()) .status(task.getStatus()) + .aiStatus(task.getAiStatus()) .claimedBy(task.getClaimedBy()) .claimedAt(task.getClaimedAt()) .submittedAt(task.getSubmittedAt()) diff --git a/src/main/java/com/label/service/VideoProcessService.java b/src/main/java/com/label/service/VideoProcessService.java index 549a3f2..410e594 100644 --- a/src/main/java/com/label/service/VideoProcessService.java +++ b/src/main/java/com/label/service/VideoProcessService.java @@ -1,17 +1,18 @@ package com.label.service; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.label.common.ai.AiServiceClient; import com.label.common.exception.BusinessException; -import com.label.common.statemachine.SourceStatus; import com.label.common.statemachine.StateValidator; +import com.label.common.statemachine.VideoSourceStatus; import com.label.entity.SourceData; -import com.label.mapper.SourceDataMapper; import com.label.entity.VideoProcessJob; +import com.label.mapper.SourceDataMapper; import com.label.mapper.VideoProcessJobMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -21,20 +22,6 @@ import org.springframework.transaction.support.TransactionSynchronizationManager import java.time.LocalDateTime; import java.util.Map; -/** - * 视频处理服务:创建任务、处理回调、管理员重置。 - * - * 状态流转: - * - 创建时:source_data → PREPROCESSING,job → PENDING - * - 回调成功:job → SUCCESS,source_data → PENDING(进入提取队列) - * - 回调失败(可重试):job → RETRYING,retryCount++,重新触发 AI - * - 回调失败(超出上限):job → FAILED,source_data → PENDING - * - 管理员重置:job → PENDING(可手动重新触发) - * - * T074 设计说明: - * AI 调用通过 TransactionSynchronizationManager.registerSynchronization().afterCommit() - * 延迟到事务提交后执行,避免在持有 DB 连接期间进行 HTTP 调用。 - */ @Slf4j @Service @RequiredArgsConstructor @@ -43,44 +30,27 @@ public class VideoProcessService { private final VideoProcessJobMapper jobMapper; private final SourceDataMapper sourceDataMapper; private final AiServiceClient aiServiceClient; + private final ObjectMapper objectMapper; - @Value("${rustfs.bucket:label-source-data}") - private String bucket; - - // ------------------------------------------------------------------ 创建任务 -- - - /** - * 创建视频处理任务并在事务提交后触发 AI 服务。 - * - * DB 写入(source_data→PREPROCESSING + 插入 job)在 @Transactional 内完成; - * AI 触发通过 afterCommit() 在事务提交后执行,不占用 DB 连接。 - * - * @param sourceId 资料 ID - * @param jobType 任务类型(FRAME_EXTRACT / VIDEO_TO_TEXT) - * @param params JSON 参数(如 {"frameInterval": 30}) - * @param companyId 租户 ID - * @return 新建的 VideoProcessJob - */ @Transactional - public VideoProcessJob createJob(Long sourceId, String jobType, - String params, Long companyId) { + public VideoProcessJob createJob(Long sourceId, String jobType, String params, Long companyId) { SourceData source = sourceDataMapper.selectById(sourceId); if (source == null || !companyId.equals(source.getCompanyId())) { - throw new BusinessException("NOT_FOUND", "资料不存在: " + sourceId, HttpStatus.NOT_FOUND); + throw new BusinessException("NOT_FOUND", "资料不存在 " + sourceId, HttpStatus.NOT_FOUND); } validateJobType(jobType); - // source_data → PREPROCESSING StateValidator.assertTransition( - SourceStatus.TRANSITIONS, - SourceStatus.valueOf(source.getStatus()), SourceStatus.PREPROCESSING); + VideoSourceStatus.TRANSITIONS, + VideoSourceStatus.valueOf(source.getStatus()), + VideoSourceStatus.PREPROCESSING + ); sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, sourceId) .set(SourceData::getStatus, "PREPROCESSING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); - // 插入 PENDING 任务 VideoProcessJob job = new VideoProcessJob(); job.setCompanyId(companyId); job.setSourceId(sourceId); @@ -91,48 +61,32 @@ public class VideoProcessService { job.setMaxRetries(3); jobMapper.insert(job); - // 事务提交后触发 AI(不在事务内,不占用 DB 连接) - final Long jobId = job.getId(); + final Long jobId = job.getId(); final String filePath = source.getFilePath(); final String finalJobType = jobType; + final String finalParams = job.getParams(); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { - triggerAi(jobId, sourceId, filePath, finalJobType); + triggerAi(jobId, sourceId, filePath, finalJobType, finalParams); } }); - log.info("视频处理任务已创建(AI 将在事务提交后触发): jobId={}, sourceId={}", jobId, sourceId); + log.info("视频处理任务已创建: jobId={}, sourceId={}", jobId, sourceId); return job; } - // ------------------------------------------------------------------ 处理回调 -- - - /** - * 处理 AI 服务异步回调(POST /api/video/callback,无需用户 Token)。 - * - * 幂等:若 job 已为 SUCCESS,直接返回,防止重复处理。 - * 重试触发同样延迟到事务提交后(afterCommit),不在事务内执行。 - * - * @param jobId 任务 ID - * @param callbackStatus AI 回调状态(SUCCESS / FAILED) - * @param outputPath 成功时的输出路径(可选) - * @param errorMessage 失败时的错误信息(可选) - */ @Transactional - public void handleCallback(Long jobId, String callbackStatus, - String outputPath, String errorMessage) { - // video_process_job 在 IGNORED_TABLES 中(回调无 CompanyContext),此处显式校验 + public void handleCallback(Long jobId, String callbackStatus, String outputPath, String errorMessage) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || job.getCompanyId() == null) { - log.warn("视频处理回调:job 不存在,jobId={}", jobId); + log.warn("视频处理回调时 job 不存在: jobId={}", jobId); return; } - // 幂等:已成功则忽略重复回调 if ("SUCCESS".equals(job.getStatus())) { - log.info("视频处理回调幂等:jobId={} 已为 SUCCESS,跳过", jobId); + log.info("视频处理回调幂等跳过: jobId={}", jobId); return; } @@ -143,28 +97,19 @@ public class VideoProcessService { } } - // ------------------------------------------------------------------ 管理员重置 -- - - /** - * 管理员手动重置失败任务(FAILED → PENDING)。 - * - * 仅允许 FAILED 状态的任务重置,重置后 retryCount 清零, - * 管理员可随后重新调用 createJob 触发处理。 - * - * @param jobId 任务 ID - * @param companyId 租户 ID - */ @Transactional public VideoProcessJob reset(Long jobId, Long companyId) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || !companyId.equals(job.getCompanyId())) { - throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND); + throw new BusinessException("NOT_FOUND", "视频处理任务不存在 " + jobId, HttpStatus.NOT_FOUND); } if (!"FAILED".equals(job.getStatus())) { - throw new BusinessException("INVALID_TRANSITION", - "只有 FAILED 状态的任务可以重置,当前状态: " + job.getStatus(), - HttpStatus.BAD_REQUEST); + throw new BusinessException( + "INVALID_TRANSITION", + "只有 FAILED 状态的任务可以重置,当前状态 " + job.getStatus(), + HttpStatus.BAD_REQUEST + ); } jobMapper.update(null, new LambdaUpdateWrapper() @@ -176,24 +121,18 @@ public class VideoProcessService { job.setStatus("PENDING"); job.setRetryCount(0); - log.info("视频处理任务已重置: jobId={}", jobId); return job; } - // ------------------------------------------------------------------ 查询 -- - public VideoProcessJob getJob(Long jobId, Long companyId) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || !companyId.equals(job.getCompanyId())) { - throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND); + throw new BusinessException("NOT_FOUND", "视频处理任务不存在 " + jobId, HttpStatus.NOT_FOUND); } return job; } - // ------------------------------------------------------------------ 私有方法 -- - private void handleSuccess(VideoProcessJob job, String outputPath) { - // job → SUCCESS jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "SUCCESS") @@ -201,13 +140,10 @@ public class VideoProcessService { .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); - // source_data PREPROCESSING → PENDING(进入提取队列) sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, job.getSourceId()) .set(SourceData::getStatus, "PENDING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); - - log.info("视频处理成功:jobId={}, sourceId={}", job.getId(), job.getSourceId()); } private void handleFailure(VideoProcessJob job, String errorMessage) { @@ -215,7 +151,6 @@ public class VideoProcessService { int maxRetries = job.getMaxRetries() != null ? job.getMaxRetries() : 3; if (newRetryCount < maxRetries) { - // 仍有重试次数:job → RETRYING,事务提交后重新触发 AI jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "RETRYING") @@ -223,26 +158,22 @@ public class VideoProcessService { .set(VideoProcessJob::getErrorMessage, errorMessage) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); - log.warn("视频处理失败,开始第 {} 次重试:jobId={}, error={}", - newRetryCount, job.getId(), errorMessage); - - // 重试 AI 触发延迟到事务提交后 SourceData source = sourceDataMapper.selectById(job.getSourceId()); if (source != null) { - final Long jobId = job.getId(); - final Long sourceId = job.getSourceId(); + final Long jobId = job.getId(); + final Long sourceId = job.getSourceId(); final String filePath = source.getFilePath(); - final String jobType = job.getJobType(); + final String jobType = job.getJobType(); + final String params = job.getParams(); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { - triggerAi(jobId, sourceId, filePath, jobType); + triggerAi(jobId, sourceId, filePath, jobType, params); } }); } } else { - // 超出最大重试次数:job → FAILED,source_data → PENDING jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "FAILED") @@ -251,40 +182,87 @@ public class VideoProcessService { .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); - // source_data PREPROCESSING → PENDING(管理员可重新处理) sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, job.getSourceId()) .set(SourceData::getStatus, "PENDING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); - - log.error("视频处理永久失败:jobId={}, sourceId={}, error={}", - job.getId(), job.getSourceId(), errorMessage); } } - private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType) { - AiServiceClient.VideoProcessRequest req = AiServiceClient.VideoProcessRequest.builder() - .sourceId(sourceId) - .filePath(filePath) - .bucket(bucket) - .params(Map.of("jobId", jobId, "jobType", jobType)) - .build(); + private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType, String paramsJson) { + Map params = parseParams(paramsJson); try { if ("FRAME_EXTRACT".equals(jobType)) { - aiServiceClient.extractFrames(req); + aiServiceClient.extractFrames(AiServiceClient.ExtractFramesRequest.builder() + .filePath(filePath) + .sourceId(sourceId) + .jobId(jobId) + .mode(stringParam(params, "mode", "interval")) + .frameInterval(intParam(params, "frameInterval", 30)) + .build()); } else { - aiServiceClient.videoToText(req); + aiServiceClient.videoToText(AiServiceClient.VideoToTextRequest.builder() + .filePath(filePath) + .sourceId(sourceId) + .jobId(jobId) + .startSec(doubleParam(params, "startSec", 0.0)) + .endSec(doubleParam(params, "endSec", 120.0)) + .model(stringParam(params, "model", null)) + .promptTemplate(stringParam(params, "promptTemplate", null)) + .build()); } - log.info("AI 触发成功: jobId={}", jobId); + log.info("AI 视频任务已触发: jobId={}", jobId); } catch (Exception e) { - log.error("触发视频处理 AI 失败(jobId={}):{},job 保持当前状态,需管理员手动重置", jobId, e.getMessage()); + log.error("触发视频处理 AI 失败(jobId={}): {}", jobId, e.getMessage()); } } + private Map parseParams(String paramsJson) { + if (paramsJson == null || paramsJson.isBlank()) { + return Map.of(); + } + try { + return objectMapper.readValue(paramsJson, new TypeReference<>() {}); + } catch (Exception e) { + log.warn("解析视频处理参数失败,将使用默认值: {}", e.getMessage()); + return Map.of(); + } + } + + private String stringParam(Map params, String key, String defaultValue) { + Object value = params.get(key); + return value == null ? defaultValue : String.valueOf(value); + } + + private Integer intParam(Map params, String key, Integer defaultValue) { + Object value = params.get(key); + if (value instanceof Number number) { + return number.intValue(); + } + if (value instanceof String text && !text.isBlank()) { + return Integer.parseInt(text); + } + return defaultValue; + } + + private Double doubleParam(Map params, String key, Double defaultValue) { + Object value = params.get(key); + if (value instanceof Number number) { + return number.doubleValue(); + } + if (value instanceof String text && !text.isBlank()) { + return Double.parseDouble(text); + } + return defaultValue; + } + private void validateJobType(String jobType) { if (!"FRAME_EXTRACT".equals(jobType) && !"VIDEO_TO_TEXT".equals(jobType)) { - throw new BusinessException("INVALID_JOB_TYPE", - "任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT", HttpStatus.BAD_REQUEST); + throw new BusinessException( + "INVALID_JOB_TYPE", + "任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT", + HttpStatus.BAD_REQUEST + ); } } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9a5e608..ec319e9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -64,8 +64,9 @@ rustfs: region: us-east-1 ai-service: - base-url: ${AI_SERVICE_BASE_URL:http://http://172.28.77.215:18000} - timeout: 30000 + base-url: ${AI_SERVICE_BASE_URL:http://172.28.77.215:18000} + #base-url: ${AI_SERVICE_BASE_URL:http://127.0.0.1:18000} + timeout: 300000 auth: enabled: true diff --git a/src/main/resources/sql/init.sql b/src/main/resources/sql/init.sql index 1824039..12b3622 100644 --- a/src/main/resources/sql/init.sql +++ b/src/main/resources/sql/init.sql @@ -87,6 +87,7 @@ CREATE TABLE IF NOT EXISTS annotation_task ( completed_at TIMESTAMP, is_final BOOLEAN NOT NULL DEFAULT FALSE, -- true 即 APPROVED 且无需再审 ai_model VARCHAR(50), + ai_status VARCHAR(20) NOT NULL DEFAULT 'PENDING', reject_reason TEXT, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() @@ -313,7 +314,7 @@ INSERT INTO sys_config (company_id, config_key, config_value, description) VALUES (NULL, 'token_ttl_seconds', '7200', '会话凭证有效期(秒)'), - (NULL, 'model_default', 'glm-4', + (NULL, 'model_default', 'qwen-plus', 'AI 辅助默认模型'), (NULL, 'video_frame_interval', '30', '视频帧提取间隔(帧数)'),