Compare commits

..

4 Commits

Author SHA1 Message Date
wh
bf0b00ed08 提取功能改为异步实现,添加ai辅助提取状态 2026-04-17 01:20:27 +08:00
wh
ccbcfd2c74 添加上传文件大小限制500M 2026-04-15 23:22:14 +08:00
wh
4708aa0f28 不追踪设计文档 2026-04-15 18:25:07 +08:00
wh
5a24ebd49b 修改yaml 2026-04-15 16:41:27 +08:00
20 changed files with 599 additions and 1498 deletions

View File

@@ -1,808 +0,0 @@
# label_backend Swagger DTO Annotation Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Make every public `label_backend` API parameter visible in Swagger with its name, type, requiredness, and meaning by replacing fixed `Map` request bodies with DTOs and adding OpenAPI annotations.
**Architecture:** Keep existing URLs, HTTP methods, JSON field names, service calls, and auth behavior unchanged. Add DTOs in the flat `com.label.dto` package, annotate Controller parameters and public schema models, and extend OpenAPI reflection tests so future endpoints cannot regress to undocumented parameters.
**Tech Stack:** Java 21, Spring Boot 3.1.5, springdoc-openapi 2.3.0, JUnit 5, AssertJ, Lombok.
---
## File Structure
### Create
- `src/main/java/com/label/dto/CompanyCreateRequest.java`
- `src/main/java/com/label/dto/CompanyUpdateRequest.java`
- `src/main/java/com/label/dto/CompanyStatusUpdateRequest.java`
- `src/main/java/com/label/dto/UserCreateRequest.java`
- `src/main/java/com/label/dto/UserUpdateRequest.java`
- `src/main/java/com/label/dto/UserStatusUpdateRequest.java`
- `src/main/java/com/label/dto/UserRoleUpdateRequest.java`
- `src/main/java/com/label/dto/CreateTaskRequest.java`
- `src/main/java/com/label/dto/TaskReassignRequest.java`
- `src/main/java/com/label/dto/RejectRequest.java`
- `src/main/java/com/label/dto/ExportBatchCreateRequest.java`
- `src/main/java/com/label/dto/VideoProcessCreateRequest.java`
- `src/main/java/com/label/dto/VideoProcessCallbackRequest.java`
- `src/main/java/com/label/dto/SysConfigUpdateRequest.java`
- `src/main/java/com/label/dto/SysConfigItemResponse.java`
- `src/main/java/com/label/dto/DynamicJsonResponse.java`
- `src/main/java/com/label/dto/FinetuneJobResponse.java`
### Modify
- `src/test/java/com/label/unit/OpenApiAnnotationTest.java`
- `src/main/java/com/label/common/result/Result.java`
- `src/main/java/com/label/common/result/PageResult.java`
- `src/main/java/com/label/dto/LoginRequest.java`
- `src/main/java/com/label/dto/LoginResponse.java`
- `src/main/java/com/label/dto/SourceResponse.java`
- `src/main/java/com/label/dto/TaskResponse.java`
- `src/main/java/com/label/dto/UserInfoResponse.java`
- `src/main/java/com/label/controller/AuthController.java`
- `src/main/java/com/label/controller/CompanyController.java`
- `src/main/java/com/label/controller/UserController.java`
- `src/main/java/com/label/controller/SourceController.java`
- `src/main/java/com/label/controller/TaskController.java`
- `src/main/java/com/label/controller/ExtractionController.java`
- `src/main/java/com/label/controller/QaController.java`
- `src/main/java/com/label/controller/ExportController.java`
- `src/main/java/com/label/controller/SysConfigController.java`
- `src/main/java/com/label/controller/VideoController.java`
- `src/main/java/com/label/entity/SysCompany.java`
- `src/main/java/com/label/entity/SysUser.java`
- `src/main/java/com/label/entity/SysConfig.java`
- `src/main/java/com/label/entity/TrainingDataset.java`
- `src/main/java/com/label/entity/ExportBatch.java`
- `src/main/java/com/label/entity/VideoProcessJob.java`
---
### Task 1: Add Failing OpenAPI Contract Tests
**Files:**
- Modify: `src/test/java/com/label/unit/OpenApiAnnotationTest.java`
- [ ] **Step 1: Add DTO and parameter coverage expectations**
Add tests that fail before implementation because new DTO classes do not exist, some endpoint parameters lack `@Parameter`, and request body parameters still use `Map`.
```java
@Test
@DisplayName("固定结构请求体不能继续使用 Map")
void fixedRequestBodiesDoNotUseRawMap() {
for (Class<?> controller : CONTROLLERS) {
Arrays.stream(controller.getDeclaredMethods())
.filter(OpenApiAnnotationTest::isEndpointMethod)
.flatMap(method -> Arrays.stream(method.getParameters()))
.filter(parameter -> parameter.isAnnotationPresent(org.springframework.web.bind.annotation.RequestBody.class))
.forEach(parameter -> assertThat(parameter.getType())
.as(controller.getSimpleName() + " request body should use an explicit DTO")
.isNotEqualTo(java.util.Map.class));
}
}
@Test
@DisplayName("所有 endpoint 公开参数都声明 @Parameter 或请求体说明")
void endpointParametersHaveOpenApiDescriptions() {
for (Class<?> controller : CONTROLLERS) {
Arrays.stream(controller.getDeclaredMethods())
.filter(OpenApiAnnotationTest::isEndpointMethod)
.forEach(method -> Arrays.stream(method.getParameters())
.filter(OpenApiAnnotationTest::isPublicApiParameter)
.forEach(parameter -> assertThat(
parameter.isAnnotationPresent(io.swagger.v3.oas.annotations.Parameter.class)
|| parameter.isAnnotationPresent(io.swagger.v3.oas.annotations.parameters.RequestBody.class))
.as(controller.getSimpleName() + "." + method.getName()
+ " parameter " + parameter.getName()
+ " should have OpenAPI description")
.isTrue()));
}
}
```
Add helper:
```java
private static boolean isPublicApiParameter(java.lang.reflect.Parameter parameter) {
return parameter.isAnnotationPresent(org.springframework.web.bind.annotation.PathVariable.class)
|| parameter.isAnnotationPresent(org.springframework.web.bind.annotation.RequestParam.class)
|| parameter.isAnnotationPresent(org.springframework.web.bind.annotation.RequestBody.class);
}
```
- [ ] **Step 2: Expand DTO and entity schema coverage**
Update DTO coverage lists so the test will fail until new DTOs and exposed entities are annotated.
```java
private static final List<Class<?>> DTOS = List.of(
LoginRequest.class,
LoginResponse.class,
UserInfoResponse.class,
TaskResponse.class,
SourceResponse.class,
CompanyCreateRequest.class,
CompanyUpdateRequest.class,
CompanyStatusUpdateRequest.class,
UserCreateRequest.class,
UserUpdateRequest.class,
UserStatusUpdateRequest.class,
UserRoleUpdateRequest.class,
CreateTaskRequest.class,
TaskReassignRequest.class,
RejectRequest.class,
ExportBatchCreateRequest.class,
VideoProcessCreateRequest.class,
VideoProcessCallbackRequest.class,
SysConfigUpdateRequest.class,
SysConfigItemResponse.class,
DynamicJsonResponse.class,
FinetuneJobResponse.class
);
private static final List<Class<?>> EXPOSED_SCHEMAS = List.of(
Result.class,
PageResult.class,
SysCompany.class,
SysUser.class,
SysConfig.class,
TrainingDataset.class,
ExportBatch.class,
VideoProcessJob.class
);
```
Add test:
```java
@Test
@DisplayName("公开响应模型都声明 @Schema")
void exposedModelsHaveSchema() {
assertThat(EXPOSED_SCHEMAS)
.allSatisfy(model ->
assertThat(model.getAnnotation(Schema.class))
.as(model.getSimpleName() + " should have @Schema")
.isNotNull());
}
```
- [ ] **Step 3: Run test and verify RED**
Run:
```bash
mvn -Dtest=OpenApiAnnotationTest test
```
Expected: FAIL because the new DTO classes are missing and existing Controller parameters are not fully annotated.
### Task 2: Create Explicit Request and Response DTOs
**Files:**
- Create all DTO files listed in File Structure
- [ ] **Step 1: Add company request DTOs**
Create `CompanyCreateRequest`, `CompanyUpdateRequest`, and `CompanyStatusUpdateRequest` with these fields:
```java
@Data
@Schema(description = "创建公司请求")
public class CompanyCreateRequest {
@Schema(description = "公司名称", example = "示例公司")
private String companyName;
@Schema(description = "公司代码,英文或数字短标识", example = "DEMO")
private String companyCode;
}
```
`CompanyUpdateRequest` uses the same fields with description "更新公司信息请求".
`CompanyStatusUpdateRequest`:
```java
@Data
@Schema(description = "更新公司状态请求")
public class CompanyStatusUpdateRequest {
@Schema(description = "公司状态可选值ACTIVE、DISABLED", example = "ACTIVE")
private String status;
}
```
- [ ] **Step 2: Add user request DTOs**
Create:
```java
@Data
@Schema(description = "创建用户请求")
public class UserCreateRequest {
@Schema(description = "登录用户名", example = "annotator01")
private String username;
@Schema(description = "明文初始密码", example = "Passw0rd!")
private String password;
@Schema(description = "真实姓名", example = "张三")
private String realName;
@Schema(description = "用户角色可选值ADMIN、REVIEWER、ANNOTATOR、UPLOADER", example = "ANNOTATOR")
private String role;
}
```
`UserUpdateRequest` fields:
```java
@Schema(description = "真实姓名", example = "张三")
private String realName;
@Schema(description = "新密码;为空时不修改密码", example = "NewPassw0rd!")
private String password;
```
`UserStatusUpdateRequest`:
```java
@Schema(description = "用户状态可选值ACTIVE、DISABLED", example = "ACTIVE")
private String status;
```
`UserRoleUpdateRequest`:
```java
@Schema(description = "用户角色可选值ADMIN、REVIEWER、ANNOTATOR、UPLOADER", example = "REVIEWER")
private String role;
```
- [ ] **Step 3: Add task request DTOs**
Create:
```java
@Data
@Schema(description = "创建标注任务请求")
public class CreateTaskRequest {
@Schema(description = "关联资料 ID", example = "1001")
private Long sourceId;
@Schema(description = "任务类型可选值EXTRACTION、QA_GENERATION", example = "EXTRACTION")
private String taskType;
}
```
```java
@Data
@Schema(description = "管理员强制指派任务请求")
public class TaskReassignRequest {
@Schema(description = "目标用户 ID", example = "2001")
private Long userId;
}
```
- [ ] **Step 4: Add common moderation and dynamic JSON DTOs**
Create:
```java
@Data
@Schema(description = "审批驳回请求")
public class RejectRequest {
@Schema(description = "驳回原因", example = "证据不足,请补充来源片段")
private String reason;
}
```
```java
@Data
@Data
@Schema(description = "动态业务 JSON 响应")
public class DynamicJsonResponse {
@Schema(description = "动态业务数据;提取阶段通常为三元组/四元组结构,问答阶段通常为问答对结构")
private Map<String, Object> content;
}
```
- [ ] **Step 5: Add export and finetune DTOs**
Create:
```java
@Data
@Schema(description = "创建训练数据导出批次请求")
public class ExportBatchCreateRequest {
@Schema(description = "训练样本 ID 列表", example = "[1,2,3]")
private List<Long> sampleIds;
}
```
```java
@Data
@Schema(description = "微调任务响应")
public class FinetuneJobResponse {
@Schema(description = "导出批次 ID", example = "10")
private Long batchId;
@Schema(description = "微调任务 ID", example = "glm-ft-abc123")
private String finetuneJobId;
@Schema(description = "微调任务状态可选值PENDING、RUNNING、SUCCESS、FAILED", example = "RUNNING")
private String status;
@Schema(description = "错误信息;任务失败时返回", example = "training file not found")
private String errorMessage;
}
```
- [ ] **Step 6: Add video and config DTOs**
Create:
```java
@Data
@Schema(description = "创建视频处理任务请求")
public class VideoProcessCreateRequest {
@Schema(description = "视频资料 ID", example = "1001")
private Long sourceId;
@Schema(description = "任务类型可选值EXTRACT_FRAMES、VIDEO_TO_TEXT", example = "EXTRACT_FRAMES")
private String jobType;
@Schema(description = "任务参数 JSON 字符串,例如抽帧模式、帧间隔或起止时间", example = "{\"mode\":\"interval\",\"frameInterval\":30}")
private String params;
}
```
```java
@Data
@Schema(description = "AI 服务视频处理回调请求")
public class VideoProcessCallbackRequest {
@Schema(description = "视频处理任务 ID", example = "123")
private Long jobId;
@Schema(description = "处理状态可选值SUCCESS、FAILED", example = "SUCCESS")
private String status;
@Schema(description = "输出文件路径;成功时返回", example = "frames/1001/0.jpg")
private String outputPath;
@Schema(description = "错误信息;失败时返回", example = "video decode failed")
private String errorMessage;
}
```
```java
@Data
@Schema(description = "系统配置更新请求")
public class SysConfigUpdateRequest {
@Schema(description = "配置值", example = "glm-4-flash")
private String value;
@Schema(description = "配置说明", example = "默认文本模型")
private String description;
}
```
```java
@Data
@Schema(description = "系统配置项响应")
public class SysConfigItemResponse {
@Schema(description = "配置键", example = "ai.defaultTextModel")
private String configKey;
@Schema(description = "配置值", example = "glm-4-flash")
private String configValue;
@Schema(description = "配置说明", example = "默认文本模型")
private String description;
@Schema(description = "配置作用域可选值GLOBAL、COMPANY", example = "COMPANY")
private String scope;
}
```
- [ ] **Step 7: Run test and verify partial GREEN**
Run:
```bash
mvn -Dtest=OpenApiAnnotationTest test
```
Expected: FAIL remains, but missing class errors are gone. Failures should now point to missing Controller parameter annotations and raw `Map` request bodies.
### Task 3: Annotate Common Models, Existing DTOs, and Exposed Entities
**Files:**
- Modify common result classes, existing DTOs, and exposed entity files listed above
- [ ] **Step 1: Add `@Schema` to `Result<T>` and `PageResult<T>`**
Update `Result<T>`:
```java
@Data
@Schema(description = "统一接口响应包装")
public class Result<T> {
@Schema(description = "业务状态码;成功为 SUCCESS失败为具体错误码", example = "SUCCESS")
private String code;
@Schema(description = "接口返回主体;不同接口类型不同")
private T data;
@Schema(description = "响应消息;失败时说明错误原因", example = "参数不合法")
private String message;
}
```
Update `PageResult<T>` similarly for `items`, `total`, `page`, `pageSize`.
- [ ] **Step 2: Complete existing DTO field descriptions**
Ensure every field in `LoginRequest`, `LoginResponse`, `UserInfoResponse`, `TaskResponse`, and `SourceResponse` has `@Schema(description = ..., example = ...)`.
- [ ] **Step 3: Add class and field schemas to exposed entities**
For each exposed entity, add class-level `@Schema(description = "...")` and field-level `@Schema`.
Use these class descriptions:
- `SysCompany`: "租户公司"
- `SysUser`: "系统用户"
- `SysConfig`: "系统配置"
- `TrainingDataset`: "训练样本数据"
- `ExportBatch`: "训练数据导出批次"
- `VideoProcessJob`: "视频处理任务"
- [ ] **Step 4: Run schema coverage test**
Run:
```bash
mvn -Dtest=OpenApiAnnotationTest test
```
Expected: raw `Map` body and parameter annotation tests still fail, schema coverage should pass.
### Task 4: Refactor Controller Request Bodies to DTOs
**Files:**
- Modify Controller files listed in File Structure
- [ ] **Step 1: Replace company `Map` request bodies**
In `CompanyController`, replace:
```java
public Result<SysCompany> create(@RequestBody Map<String, String> body)
```
with:
```java
public Result<SysCompany> create(
@io.swagger.v3.oas.annotations.parameters.RequestBody(
description = "创建公司请求体",
required = true)
@RequestBody CompanyCreateRequest body)
```
Call service with `body.getCompanyName()` and `body.getCompanyCode()`.
Apply the same pattern to `update` and `updateStatus`.
- [ ] **Step 2: Replace user `Map` request bodies**
In `UserController`, replace create/update/status/role request bodies with `UserCreateRequest`, `UserUpdateRequest`, `UserStatusUpdateRequest`, and `UserRoleUpdateRequest`.
Keep service calls unchanged except changing `body.get("field")` to DTO getters.
- [ ] **Step 3: Replace task request bodies**
In `TaskController`, use `CreateTaskRequest` and `TaskReassignRequest`.
`createTask` service call becomes:
```java
taskService.createTask(body.getSourceId(), body.getTaskType(), principal.getCompanyId())
```
`reassign` target becomes:
```java
Long targetUserId = body.getUserId();
```
- [ ] **Step 4: Replace export batch request body**
In `ExportController#createBatch`, use `ExportBatchCreateRequest`.
Service call becomes:
```java
return Result.success(exportService.createBatch(body.getSampleIds(), principal(request)));
```
- [ ] **Step 5: Replace config update request body**
In `SysConfigController#updateConfig`, use `SysConfigUpdateRequest`.
Service call uses `body.getValue()` and `body.getDescription()`.
- [ ] **Step 6: Replace video request bodies**
In `VideoController#createJob`, use `VideoProcessCreateRequest`.
In `VideoController#handleCallback`, use `VideoProcessCallbackRequest`.
Keep callback secret header logic unchanged.
- [ ] **Step 7: Replace reject request bodies**
In `ExtractionController#reject` and `QaController#reject`, use `RejectRequest`.
Set:
```java
String reason = body != null ? body.getReason() : null;
```
- [ ] **Step 8: Run raw Map body test**
Run:
```bash
mvn -Dtest=OpenApiAnnotationTest test
```
Expected: raw `Map` request-body test passes. Parameter description test may still fail until Task 5.
### Task 5: Add Controller Parameter and Request Body OpenAPI Descriptions
**Files:**
- Modify all Controller files
- [ ] **Step 1: Add imports**
Use:
```java
import io.swagger.v3.oas.annotations.Parameter;
```
Use fully qualified `io.swagger.v3.oas.annotations.parameters.RequestBody` for Swagger request body annotations to avoid conflict with Spring `@RequestBody`.
- [ ] **Step 2: Annotate pagination query parameters**
For every `page` parameter:
```java
@Parameter(description = "页码,从 1 开始", example = "1")
@RequestParam(defaultValue = "1") int page
```
For every `pageSize` parameter:
```java
@Parameter(description = "每页条数", example = "20")
@RequestParam(defaultValue = "20") int pageSize
```
- [ ] **Step 3: Annotate status and type filters**
Examples:
```java
@Parameter(description = "任务状态过滤可选值UNCLAIMED、IN_PROGRESS、SUBMITTED、APPROVED、REJECTED", example = "SUBMITTED")
@RequestParam(required = false) String status
```
```java
@Parameter(description = "任务类型过滤可选值EXTRACTION、QA_GENERATION", example = "EXTRACTION")
@RequestParam(required = false) String taskType
```
- [ ] **Step 4: Annotate path variables**
Examples:
```java
@Parameter(description = "任务 ID", example = "1001")
@PathVariable Long id
```
```java
@Parameter(description = "视频处理任务 ID", example = "123")
@PathVariable Long jobId
```
```java
@Parameter(description = "系统配置键", example = "ai.defaultTextModel")
@PathVariable String key
```
- [ ] **Step 5: Annotate multipart upload parameters**
In `SourceController#upload`:
```java
@Parameter(description = "上传文件,支持文本、图片、视频", required = true)
@RequestParam("file") MultipartFile file
```
```java
@Parameter(description = "资料类型可选值text、image、video", example = "text", required = true)
@RequestParam("dataType") String dataType
```
- [ ] **Step 6: Annotate request bodies**
Every Spring `@RequestBody` parameter should also have Swagger request-body annotation:
```java
@io.swagger.v3.oas.annotations.parameters.RequestBody(
description = "创建任务请求体",
required = true)
@RequestBody CreateTaskRequest body
```
- [ ] **Step 7: Run parameter coverage test**
Run:
```bash
mvn -Dtest=OpenApiAnnotationTest test
```
Expected: PASS for `OpenApiAnnotationTest`.
### Task 6: Make Fixed Map Responses Swagger-Readable
**Files:**
- Modify `ExportController.java`
- Modify `SysConfigController.java`
- Modify `ExtractionController.java`
- Modify `QaController.java`
- Modify `FinetuneService.java` only if needed to map stable fields cleanly
- [ ] **Step 1: Convert config list response**
Change `SysConfigController#listConfig` return type from:
```java
Result<List<Map<String, Object>>>
```
to:
```java
Result<List<SysConfigItemResponse>>
```
Map each item from service map using keys currently returned by `SysConfigService.list`.
- [ ] **Step 2: Wrap dynamic extraction and QA responses**
Change `ExtractionController#getResult` and `QaController#getResult` return types from `Result<Map<String, Object>>` to `Result<DynamicJsonResponse>`.
Implementation:
```java
DynamicJsonResponse response = new DynamicJsonResponse();
response.setContent(extractionService.getResult(taskId, principal(request)));
return Result.success(response);
```
Use the same pattern for QA.
- [ ] **Step 3: Keep raw JSON update request bodies as raw strings**
Do not change `PUT /api/extraction/{taskId}` or `PUT /api/qa/{taskId}` from raw string request body unless compatibility is explicitly approved.
Instead, annotate the raw string body with a Swagger request-body description explaining it is a complete JSON string.
- [ ] **Step 4: Convert finetune responses only if fields are stable**
Inspect `FinetuneService#trigger` and `FinetuneService#getStatus`.
If returned maps have stable keys, convert Controller responses to `FinetuneJobResponse` by mapping the known keys.
If keys are not stable, keep `Map<String,Object>` response but document `@Operation(description = "...")` and do not convert response type.
- [ ] **Step 5: Run compile test**
Run:
```bash
mvn -DskipTests compile
```
Expected: PASS.
### Task 7: Run Focused Compatibility Tests
**Files:**
- Modify integration tests only if DTO request binding requires updating test payload helpers
- [ ] **Step 1: Run OpenAPI annotation test**
Run:
```bash
mvn -Dtest=OpenApiAnnotationTest test
```
Expected: PASS.
- [ ] **Step 2: Run API flows affected by DTO request body replacement**
Run:
```bash
mvn -Dtest=UserManagementIntegrationTest,SysConfigIntegrationTest,ExportIntegrationTest,VideoCallbackIdempotencyTest test
```
Expected: PASS. If Docker/Testcontainers is unavailable, record the failure reason and run `mvn -DskipTests compile` plus `OpenApiAnnotationTest`.
- [ ] **Step 3: Run full test suite if environment supports Docker**
Run:
```bash
mvn test
```
Expected: PASS, or Docker/Testcontainers-specific skip/failure documented.
### Task 8: Final Documentation and Review
**Files:**
- Modify: `README.md` only if implementation reveals stricter constraints than already documented
- Modify: `docs/superpowers/specs/2026-04-15-label-backend-swagger-annotations-design.md` only if a design decision changes during implementation
- [ ] **Step 1: Check for remaining raw request body maps**
Run:
```bash
rg -n "@RequestBody Map<|Map<String, Object> body|Map<String, String> body" src/main/java/com/label/controller
```
Expected: no fixed-structure request-body maps remain. Dynamic non-request-body maps may remain only where intentionally documented.
- [ ] **Step 2: Check for missing `@Parameter` on public endpoint params**
Run:
```bash
mvn -Dtest=OpenApiAnnotationTest test
```
Expected: PASS.
- [ ] **Step 3: Review git diff**
Run:
```bash
git diff -- src/main/java src/test/java README.md docs/superpowers
```
Expected: diff only contains Swagger annotations, DTO additions, DTO request binding updates, and matching tests.
- [ ] **Step 4: Keep implementation uncommitted for review**
Do not commit unless the user explicitly asks for it.
Run:
```bash
git status --short
```
Expected: implementation remains available in the worktree for review.

View File

@@ -1,301 +0,0 @@
# label_backend Swagger 参数注解与 DTO 化设计
> 日期2026-04-15
> 范围:`label_backend` 对外 REST API 的 Swagger/OpenAPI 文档增强
## 1. 背景
当前 `label_backend` 已经在 Controller 层使用了 `@Tag``@Operation`,少量 DTO 也已有 `@Schema` 注解,因此 Swagger 页面可以展示基础接口列表和部分对象结构。
但现状仍存在几个明显问题:
- 很多路径参数、查询参数、表单参数缺少名称、类型、取值和含义说明
- 多个接口仍使用 `Map<String, Object>``Map<String, String>` 作为请求体Swagger 无法准确展示字段名、字段类型和字段说明
- 一些固定结构响应仍以 `Map<String, Object>` 返回Swagger 只能显示匿名对象
- 部分接口直接暴露实体类,而实体字段尚未补齐 Swagger 字段描述
- 通用返回包装 `Result<T>``PageResult<T>` 没有字段级别文档说明
用户目标很明确:在 Swagger 中看到所有接口参数的名称、类型和含义。
## 2. 目标
本次改造完成后,`label_backend` 的 Swagger 页面应满足以下目标:
- 所有公开接口都能展示清晰的路径参数、查询参数、请求头参数、表单参数说明
- 所有固定结构的请求体都使用 DTO 建模,并为每个字段提供名称、类型、必填性和含义说明
- 所有固定结构的主要响应对象都能展示字段说明
- 所有分页与统一返回包装的字段含义清晰可见
- 不改变现有接口路径、HTTP 方法和字段名称,尽量保持对现有调用方兼容
## 3. 非目标
本次不做以下事情:
- 不重构整体业务流程
- 不调整接口 URL、HTTP 方法和权限策略
- 不强制把所有实体类都替换成专门的 Response DTO
- 不把完全动态的业务 JSON 全量重建成复杂深层对象,只对固定边界做显式包装
- 不顺手做无关的代码整理或目录重构
## 4. 设计原则
### 4.1 DTO-first
凡是 Swagger 需要清楚展示字段名、字段类型和字段说明的固定结构请求体,都应优先使用 DTO而不是 `Map<String, Object>`
### 4.2 文档增强优先,行为保持不变
本次的核心是 API 契约可读性增强,因此:
- Controller 仍调用原有 Service
- 参数字段名保持不变
- 业务语义、状态流转、权限校验保持不变
### 4.3 固定结构显式化,动态结构边界化
如果接口返回的是稳定字段集合,应使用明确 DTO 或明确对象字段注解。
如果业务内部结果本身仍是动态 JSON则至少提供一个固定外层 DTO把最外层字段含义说明清楚避免 Swagger 展示匿名 `Map`
### 4.4 最小可控改动
优先修改 Controller 入口、DTO 定义和 Swagger 展示对象,尽量不侵入 Service 深层逻辑。
## 5. 目标改造范围
### 5.1 Controller
本次覆盖以下 10 个 Controller
- `AuthController`
- `CompanyController`
- `ExportController`
- `ExtractionController`
- `QaController`
- `SourceController`
- `SysConfigController`
- `TaskController`
- `UserController`
- `VideoController`
### 5.2 通用返回模型
- `Result<T>`
- `PageResult<T>`
### 5.3 现有 DTO
- `LoginRequest`
- `LoginResponse`
- `SourceResponse`
- `TaskResponse`
- `UserInfoResponse`
### 5.4 当前直接暴露给 Swagger 的实体
- `SysUser`
- `SysCompany`
- `SysConfig`
- `TrainingDataset`
- `ExportBatch`
- `VideoProcessJob`
## 6. DTO 改造策略
### 6.1 必须从 Map 重构为 DTO 的请求体
以下接口应从匿名请求体改为明确 DTO
- `TaskController#createTask`
- 新增 `CreateTaskRequest`
- `TaskController#reassign`
- 新增 `TaskReassignRequest`
- `VideoController#createJob`
- 新增 `VideoProcessCreateRequest`
- `VideoController#handleCallback`
- 新增 `VideoProcessCallbackRequest`
- `CompanyController#update`
- 新增明确请求 DTO
- `CompanyController#updateStatus`
- 新增明确请求 DTO
- `ExportController#createBatch`
- 新增明确请求 DTO
- `UserController` 中如仍存在匿名请求体,也统一改成 DTO
### 6.2 固定结构响应优先改为 DTO
以下返回如果当前为 `Map<String, Object>` 但字段稳定,应收敛为 DTO
- 导出与微调相关状态响应
- 视频回调相关固定结构响应
- 系统配置项列表响应
### 6.3 动态结果场景
`ExtractionController``QaController` 可能仍涉及结构化 JSON 结果。处理原则如下:
- 如果最外层字段稳定,则增加外层 DTO 说明
- 如果内部 `items` 仍允许动态内容,则在 DTO 字段级别说明该字段承载的业务 JSON 结构
## 7. Swagger 注解标准
### 7.1 Controller 方法
每个接口方法统一遵循:
- `@Tag`
- `@Operation(summary = "...", description = "...")`
- 对路径参数、查询参数、请求头参数、表单参数补 `@Parameter`
- 对请求体补 `@io.swagger.v3.oas.annotations.parameters.RequestBody`
参数描述至少包含:
- 参数业务含义
- 是否必填
- 枚举值范围或典型值
- 分页默认值或限制条件
### 7.2 DTO 字段
所有公开请求/响应 DTO 字段统一使用:
- `@Schema(description = "...", example = "...")`
必要时增加:
- `@NotNull`
- `@NotBlank`
- `@Valid`
### 7.3 通用包装类
`Result<T>` 应说明:
- `code`:业务状态码
- `data`:接口返回主体
- `message`:失败或补充说明
`PageResult<T>` 应说明:
- `items`:当前页数据列表
- `total`:总记录数
- `page`:当前页码,从 1 开始
- `pageSize`:每页条数
### 7.4 直接暴露实体
对于当前直接暴露给 Swagger 的实体类,给字段补齐 `@Schema` 说明,但不在本次强制转换为 Response DTO。
## 8. 代码组织方案
建议在 `dto/` 下继续保持扁平化风格,新增与接口语义一致的请求和响应类,命名以业务动作为中心,例如:
- `CreateTaskRequest`
- `TaskReassignRequest`
- `VideoProcessCreateRequest`
- `VideoProcessCallbackRequest`
- `CompanyUpdateRequest`
- `CompanyStatusUpdateRequest`
- `ExportBatchCreateRequest`
如果某个响应只在单个 Controller 使用,但字段固定,也放在 `dto/` 下,而不是内联 `Map`
## 9. 兼容性要求
为了避免影响现有调用方,本次必须满足:
- 接口 URL 不变
- HTTP 方法不变
- JSON 字段名称不变
- Multipart 参数名不变
- 路径参数名和查询参数名不变
DTO 只是显式建模现有契约,不是重新设计契约。
## 9.1 README 约束同步
本次设计不仅要求代码层落地,还要求将 Swagger/DTO 约束写入项目 `README.md` 的开发规范中,作为后续接口开发的长期规则。
README 中至少应明确以下要求:
- 所有对外接口参数必须在 Swagger 中清楚展示名称、类型和含义
- 固定结构请求体禁止继续使用匿名 `Map`,必须定义 DTO
- 固定结构响应应优先显式建模,避免 Swagger 展示匿名对象
- 通用返回体和分页包装也必须维护字段说明
## 10. 测试策略
由于本次会把匿名请求体改为 DTO需要用测试确认请求绑定行为没有被改坏。
测试策略如下:
- 优先补或更新 Controller 测试
- 覆盖 DTO 替换后的 JSON 反序列化
- 验证关键接口的请求字段名保持不变
- 验证原有成功路径和主要失败路径仍成立
至少应覆盖:
- 创建任务
- 重指派任务
- 创建视频处理任务
- 接收视频回调
- 创建导出批次
- 公司更新和状态更新
## 11. 风险与处理
### 11.1 风险:字段名不一致导致请求绑定失败
处理方式:
- DTO 字段严格对齐当前请求 JSON 的既有字段名
- 使用 Controller 测试验证兼容性
### 11.2 风险:动态 JSON 过度 DTO 化,导致业务边界变复杂
处理方式:
- 只对固定边界建模
- 动态业务内容保留为受控字段,不做过度深挖
### 11.3 风险:实体类字段太多,注解工作量大
处理方式:
- 只处理当前实际暴露到 Swagger 的实体
- 优先处理高频接口涉及对象
## 12. 实施顺序
推荐按以下顺序实施:
1.`Result<T>``PageResult<T>` 补齐字段说明
2. 给已有公开 DTO 补齐 `@Schema`
3. 将匿名请求体改造成 DTO并更新对应 Controller
4. 将固定结构的 `Map` 响应改造成 DTO
5. 为直接暴露的实体补齐 `@Schema`
6. 统一补齐 Controller 参数 `@Parameter` 注解
7. 更新或新增相关测试并执行验证
## 13. 验收标准
验收通过应满足以下条件:
- Swagger 页面中所有公开接口参数都能看到名称、类型和含义
- 所有固定结构请求体不再以匿名 `Map` 展示
- 主要响应对象字段说明齐全
- 通用返回体字段说明齐全
- `README.md` 已写入 Swagger/DTO 文档约束
- Controller 相关测试通过
- 未引入接口路径或字段名兼容性破坏
## 14. 决策总结
本次采用“DTO-first API 文档化”方案:
- 请求体优先 DTO 化
- 固定结构响应优先显式建模
- Controller 参数注释统一化
- 通用返回体和当前暴露实体补齐 Swagger 字段说明
- 在不改变业务语义的前提下,最大化提升 Swagger 可读性和接口契约清晰度

View File

@@ -1,5 +1,6 @@
package com.label.common.ai;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
@@ -36,87 +37,190 @@ public class AiServiceClient {
@Data
@Builder
public static class ExtractionRequest {
private Long sourceId;
public static class TextExtractRequest {
@JsonProperty("file_path")
private String filePath;
private String bucket;
@JsonProperty("file_name")
private String fileName;
private String model;
private String prompt;
@JsonProperty("prompt_template")
private String promptTemplate;
}
@Data
@Builder
public static class ImageExtractRequest {
@JsonProperty("file_path")
private String filePath;
@JsonProperty("task_id")
private Long taskId;
private String model;
@JsonProperty("prompt_template")
private String promptTemplate;
}
@Data
public static class ExtractionResponse {
private List<Map<String, Object>> items; // triple/quadruple items
private String rawOutput;
}
@Data
@Builder
public static class VideoProcessRequest {
private Long sourceId;
public static class ExtractFramesRequest {
@JsonProperty("file_path")
private String filePath;
private String bucket;
private Map<String, Object> params; // frameInterval, mode etc.
@JsonProperty("source_id")
private Long sourceId;
@JsonProperty("job_id")
private Long jobId;
private String mode;
@JsonProperty("frame_interval")
private Integer frameInterval;
}
@Data
@Builder
public static class VideoToTextRequest {
@JsonProperty("file_path")
private String filePath;
@JsonProperty("source_id")
private Long sourceId;
@JsonProperty("job_id")
private Long jobId;
@JsonProperty("start_sec")
private Double startSec;
@JsonProperty("end_sec")
private Double endSec;
private String model;
@JsonProperty("prompt_template")
private String promptTemplate;
}
@Data
public static class TextQaItem {
private String subject;
private String predicate;
private String object;
@JsonProperty("source_snippet")
private String sourceSnippet;
}
@Data
@Builder
public static class GenTextQaRequest {
private List<TextQaItem> items;
private String model;
@JsonProperty("prompt_template")
private String promptTemplate;
}
@Data
public static class ImageQaItem {
private String subject;
private String predicate;
private String object;
private String qualifier;
@JsonProperty("cropped_image_path")
private String croppedImagePath;
}
@Data
@Builder
public static class GenImageQaRequest {
private List<ImageQaItem> items;
private String model;
@JsonProperty("prompt_template")
private String promptTemplate;
}
@Data
public static class QaGenResponse {
private List<Map<String, Object>> qaPairs;
private List<Map<String, Object>> pairs;
}
@Data
@Builder
public static class FinetuneRequest {
private String datasetPath; // RustFS path to JSONL file
private String model;
private Long batchId;
public static class FinetuneStartRequest {
@JsonProperty("jsonl_url")
private String jsonlUrl;
@JsonProperty("base_model")
private String baseModel;
private Map<String, Object> hyperparams;
}
@Data
public static class FinetuneResponse {
public static class FinetuneStartResponse {
@JsonProperty("job_id")
private String jobId;
private String status;
}
@Data
public static class FinetuneStatusResponse {
@JsonProperty("job_id")
private String jobId;
private String status; // PENDING/RUNNING/COMPLETED/FAILED
private Integer progress; // 0-100
@JsonProperty("error_message")
private String errorMessage;
}
// The 8 endpoints:
public ExtractionResponse extractText(ExtractionRequest request) {
return restTemplate.postForObject("/extract/text", request, ExtractionResponse.class);
public ExtractionResponse extractText(TextExtractRequest request) {
return restTemplate.postForObject("/api/v1/text/extract", request, ExtractionResponse.class);
}
public ExtractionResponse extractImage(ExtractionRequest request) {
return restTemplate.postForObject("/extract/image", request, ExtractionResponse.class);
public ExtractionResponse extractImage(ImageExtractRequest request) {
return restTemplate.postForObject("/api/v1/image/extract", request, ExtractionResponse.class);
}
public void extractFrames(VideoProcessRequest request) {
restTemplate.postForLocation("/video/extract-frames", request);
public void extractFrames(ExtractFramesRequest request) {
restTemplate.postForLocation("/api/v1/video/extract-frames", request);
}
public void videoToText(VideoProcessRequest request) {
restTemplate.postForLocation("/video/to-text", request);
public void videoToText(VideoToTextRequest request) {
restTemplate.postForLocation("/api/v1/video/to-text", request);
}
public QaGenResponse genTextQa(ExtractionRequest request) {
return restTemplate.postForObject("/qa/gen-text", request, QaGenResponse.class);
public QaGenResponse genTextQa(GenTextQaRequest request) {
return restTemplate.postForObject("/api/v1/qa/gen-text", request, QaGenResponse.class);
}
public QaGenResponse genImageQa(ExtractionRequest request) {
return restTemplate.postForObject("/qa/gen-image", request, QaGenResponse.class);
public QaGenResponse genImageQa(GenImageQaRequest request) {
return restTemplate.postForObject("/api/v1/qa/gen-image", request, QaGenResponse.class);
}
public FinetuneResponse startFinetune(FinetuneRequest request) {
return restTemplate.postForObject("/finetune/start", request, FinetuneResponse.class);
public FinetuneStartResponse startFinetune(FinetuneStartRequest request) {
return restTemplate.postForObject("/api/v1/finetune/start", request, FinetuneStartResponse.class);
}
public FinetuneStatusResponse getFinetuneStatus(String jobId) {
return restTemplate.getForObject("/finetune/status/{jobId}", FinetuneStatusResponse.class, jobId);
return restTemplate.getForObject("/api/v1/finetune/status/{jobId}", FinetuneStatusResponse.class, jobId);
}
}

View File

@@ -1,14 +0,0 @@
package com.label.common.statemachine;
import java.util.Map;
import java.util.Set;
public enum DatasetStatus {
PENDING_REVIEW, APPROVED, REJECTED;
public static final Map<DatasetStatus, Set<DatasetStatus>> TRANSITIONS = Map.of(
PENDING_REVIEW, Set.of(APPROVED, REJECTED),
REJECTED, Set.of(PENDING_REVIEW) // 重新提交审核
// APPROVED: terminal state
);
}

View File

@@ -1,20 +0,0 @@
package com.label.common.statemachine;
import java.util.Map;
import java.util.Set;
public enum VideoJobStatus {
PENDING, RUNNING, SUCCESS, FAILED, RETRYING;
/**
* Automatic state machine transitions.
* Note: FAILED → PENDING is a manual ADMIN operation, handled separately in VideoProcessService.reset().
*/
public static final Map<VideoJobStatus, Set<VideoJobStatus>> TRANSITIONS = Map.of(
PENDING, Set.of(RUNNING),
RUNNING, Set.of(SUCCESS, FAILED, RETRYING),
RETRYING, Set.of(RUNNING, FAILED)
// SUCCESS: terminal state
// FAILED → PENDING: manual ADMIN reset, NOT in this automatic transitions map
);
}

View File

@@ -3,10 +3,10 @@ package com.label.common.statemachine;
import java.util.Map;
import java.util.Set;
public enum SourceStatus {
public enum VideoSourceStatus {
PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED;
public static final Map<SourceStatus, Set<SourceStatus>> TRANSITIONS = Map.of(
public static final Map<VideoSourceStatus, Set<VideoSourceStatus>> TRANSITIONS = Map.of(
PENDING, Set.of(EXTRACTING, PREPROCESSING),
PREPROCESSING, Set.of(PENDING),
EXTRACTING, Set.of(QA_REVIEW),

View File

@@ -0,0 +1,26 @@
package com.label.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("aiTaskExecutor")
public Executor aiTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("ai-annotate-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

View File

@@ -1,18 +1,26 @@
package com.label.controller;
import java.util.Map;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.label.annotation.RequireRole;
import com.label.common.auth.TokenPrincipal;
import com.label.common.result.Result;
import com.label.dto.RejectRequest;
import com.label.service.ExtractionService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* 提取阶段标注工作台接口5 个端点)。
@@ -25,13 +33,23 @@ public class ExtractionController {
private final ExtractionService extractionService;
/** POST /api/extraction/{taskId}/ai-annotate — AI 辅助预标注 */
@Operation(summary = "AI 辅助预标注", description = "调用 AI 服务自动生成预标注结果,可重复调用")
@PostMapping("/{taskId}/ai-annotate")
@RequireRole("ANNOTATOR")
public Result<Void> aiPreAnnotate(
@Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId,
HttpServletRequest request) {
extractionService.aiPreAnnotate(taskId, principal(request));
return Result.success(null);
}
/** GET /api/extraction/{taskId} — 获取当前标注结果 */
@Operation(summary = "获取提取标注结果")
@GetMapping("/{taskId}")
@RequireRole("ANNOTATOR")
public Result<Map<String, Object>> getResult(
@Parameter(description = "任务 ID", example = "1001")
@PathVariable Long taskId,
@Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId,
HttpServletRequest request) {
return Result.success(extractionService.getResult(taskId, principal(request)));
}
@@ -41,12 +59,8 @@ public class ExtractionController {
@PutMapping("/{taskId}")
@RequireRole("ANNOTATOR")
public Result<Void> updateResult(
@Parameter(description = "任务 ID", example = "1001")
@PathVariable Long taskId,
@io.swagger.v3.oas.annotations.parameters.RequestBody(
description = "完整提取标注结果 JSON 字符串,保持原始 JSON body 直接提交",
required = true)
@RequestBody String resultJson,
@Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = "完整提取标注结果 JSON 字符串,保持原始 JSON body 直接提交", required = true) @RequestBody String resultJson,
HttpServletRequest request) {
extractionService.updateResult(taskId, resultJson, principal(request));
return Result.success(null);
@@ -57,8 +71,7 @@ public class ExtractionController {
@PostMapping("/{taskId}/submit")
@RequireRole("ANNOTATOR")
public Result<Void> submit(
@Parameter(description = "任务 ID", example = "1001")
@PathVariable Long taskId,
@Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId,
HttpServletRequest request) {
extractionService.submit(taskId, principal(request));
return Result.success(null);
@@ -69,8 +82,7 @@ public class ExtractionController {
@PostMapping("/{taskId}/approve")
@RequireRole("REVIEWER")
public Result<Void> approve(
@Parameter(description = "任务 ID", example = "1001")
@PathVariable Long taskId,
@Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId,
HttpServletRequest request) {
extractionService.approve(taskId, principal(request));
return Result.success(null);
@@ -81,12 +93,8 @@ public class ExtractionController {
@PostMapping("/{taskId}/reject")
@RequireRole("REVIEWER")
public Result<Void> reject(
@Parameter(description = "任务 ID", example = "1001")
@PathVariable Long taskId,
@io.swagger.v3.oas.annotations.parameters.RequestBody(
description = "驳回提取结果请求体",
required = true)
@RequestBody RejectRequest body,
@Parameter(description = "任务 ID", example = "1001") @PathVariable Long taskId,
@io.swagger.v3.oas.annotations.parameters.RequestBody(description = "驳回提取结果请求体", required = true) @RequestBody RejectRequest body,
HttpServletRequest request) {
String reason = body != null ? body.getReason() : null;
extractionService.reject(taskId, reason, principal(request));

View File

@@ -73,7 +73,7 @@ public class TaskController {
/** GET /api/tasks — 查询全部任务ADMIN */
@Operation(summary = "管理员查询全部任务")
@GetMapping
@RequireRole("ADMIN")
@RequireRole("ANNOTATOR")
public Result<PageResult<TaskResponse>> getAll(
@Parameter(description = "页码,从 1 开始", example = "1")
@RequestParam(defaultValue = "1") int page,

View File

@@ -24,6 +24,8 @@ public class TaskResponse {
private String status;
@Schema(description = "领取人用户 ID", example = "1")
private Long claimedBy;
@Schema(description = "AI 预标注状态PENDING/PROCESSING/COMPLETED/FAILED", example = "COMPLETED")
private String aiStatus;
@Schema(description = "领取时间", example = "2026-04-15T12:34:56")
private LocalDateTime claimedAt;
@Schema(description = "提交时间", example = "2026-04-15T12:34:56")

View File

@@ -53,6 +53,9 @@ public class AnnotationTask {
/** 驳回原因 */
private String rejectReason;
/** AI 预标注状态PENDING / PROCESSING / COMPLETED / FAILED */
private String aiStatus;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;

View File

@@ -1,43 +1,28 @@
package com.label.listener;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Value;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.label.common.ai.AiServiceClient;
import com.label.common.context.CompanyContext;
import com.label.entity.AnnotationResult;
import com.label.entity.SourceData;
import com.label.entity.TrainingDataset;
import com.label.event.ExtractionApprovedEvent;
import com.label.mapper.AnnotationResultMapper;
import com.label.mapper.SourceDataMapper;
import com.label.mapper.TrainingDatasetMapper;
import com.label.service.TaskService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.label.common.ai.AiServiceClient;
import com.label.common.context.CompanyContext;
import com.label.entity.SourceData;
import com.label.entity.TrainingDataset;
import com.label.event.ExtractionApprovedEvent;
import com.label.mapper.SourceDataMapper;
import com.label.mapper.TrainingDatasetMapper;
import com.label.service.TaskService;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* 提取审批通过后的异步处理器。
*
* 设计约束(关键):
* - @TransactionalEventListener(AFTER_COMMIT):确保在审批事务提交后才触发 AI 调用
* - @Transactional(REQUIRES_NEW):在独立新事务中写 DB与审批事务完全隔离
* - 异常不会回滚审批事务(已提交),但会在日志中记录
*
* 处理流程:
* 1. 调用 AI 生成候选问答对Text/Image 走不同端点)
* 2. 写入 training_datasetstatus=PENDING_REVIEW
* 3. 创建 QA_GENERATION 任务status=UNCLAIMED
* 4. 更新 source_data 状态为 QA_REVIEW
*/
@Slf4j
@Component
@RequiredArgsConstructor
@@ -47,23 +32,19 @@ public class ExtractionApprovedEventListener {
private final SourceDataMapper sourceDataMapper;
private final TaskService taskService;
private final AiServiceClient aiServiceClient;
private final AnnotationResultMapper annotationResultMapper;
private final ObjectMapper objectMapper;
@Value("${rustfs.bucket:label-source-data}")
private String bucket;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void onExtractionApproved(ExtractionApprovedEvent event) {
log.info("处理提取审批通过事件: taskId={}, sourceId={}", event.getTaskId(), event.getSourceId());
// 设置多租户上下文(新事务中 ThreadLocal 已清除)
CompanyContext.set(event.getCompanyId());
try {
processEvent(event);
} catch (Exception e) {
log.error("处理审批通过事件失败taskId={}{}", event.getTaskId(), e.getMessage(), e);
// 不向上抛出,审批操作已提交,此处失败不回滚审批
log.error("处理审批通过事件失败(taskId={}): {}", event.getTaskId(), e.getMessage(), e);
} finally {
CompanyContext.clear();
}
@@ -76,57 +57,79 @@ public class ExtractionApprovedEventListener {
return;
}
// 1. 调用 AI 生成候选问答对
AiServiceClient.ExtractionRequest req = AiServiceClient.ExtractionRequest.builder()
.sourceId(source.getId())
.filePath(source.getFilePath())
.bucket(bucket)
.build();
List<Map<String, Object>> qaPairs;
try {
AiServiceClient.QaGenResponse response = "IMAGE".equals(source.getDataType())
? aiServiceClient.genImageQa(req)
: aiServiceClient.genTextQa(req);
qaPairs = response != null && response.getQaPairs() != null
? response.getQaPairs()
? aiServiceClient.genImageQa(buildImageQaRequest(event.getTaskId()))
: aiServiceClient.genTextQa(buildTextQaRequest(event.getTaskId()));
qaPairs = response != null && response.getPairs() != null
? response.getPairs()
: Collections.emptyList();
} catch (Exception e) {
log.warn("AI 问答生成失败taskId={}{},将使用空问答对", event.getTaskId(), e.getMessage());
log.warn("AI 问答生成失败(taskId={}): {},将使用空问答对", event.getTaskId(), e.getMessage());
qaPairs = Collections.emptyList();
}
// 2. 写入 training_datasetPENDING_REVIEW
String sampleType = "IMAGE".equals(source.getDataType()) ? "IMAGE" : "TEXT";
String glmJson = buildGlmJson(qaPairs);
TrainingDataset dataset = new TrainingDataset();
dataset.setCompanyId(event.getCompanyId());
dataset.setTaskId(event.getTaskId());
dataset.setSourceId(event.getSourceId());
dataset.setSampleType(sampleType);
dataset.setGlmFormatJson(glmJson);
dataset.setGlmFormatJson(buildGlmJson(qaPairs));
dataset.setStatus("PENDING_REVIEW");
datasetMapper.insert(dataset);
// 3. 创建 QA_GENERATION 任务UNCLAIMED
taskService.createTask(event.getSourceId(), "QA_GENERATION", event.getCompanyId());
// 4. 更新 source_data 状态为 QA_REVIEW
sourceDataMapper.updateStatus(event.getSourceId(), "QA_REVIEW", event.getCompanyId());
log.info("审批通过后续处理完成: taskId={}, 新 QA 任务已创建", event.getTaskId());
log.info("审批通过后续处理完成: taskId={}", event.getTaskId());
}
/**
* 将 AI 生成的问答对列表转换为 GLM fine-tune 格式 JSON。
*/
private String buildGlmJson(List<Map<String, Object>> qaPairs) {
try {
return objectMapper.writeValueAsString(Map.of("conversations", qaPairs));
} catch (Exception e) {
log.error("构建 GLM JSON 失败", e);
log.error("构建微调 JSON 失败", e);
return "{\"conversations\":[]}";
}
}
private AiServiceClient.GenTextQaRequest buildTextQaRequest(Long taskId) {
List<AiServiceClient.TextQaItem> items = readAnnotationItems(taskId).stream()
.map(item -> objectMapper.convertValue(item, AiServiceClient.TextQaItem.class))
.toList();
return AiServiceClient.GenTextQaRequest.builder()
.items(items)
.build();
}
private AiServiceClient.GenImageQaRequest buildImageQaRequest(Long taskId) {
List<AiServiceClient.ImageQaItem> items = readAnnotationItems(taskId).stream()
.map(item -> objectMapper.convertValue(item, AiServiceClient.ImageQaItem.class))
.toList();
return AiServiceClient.GenImageQaRequest.builder()
.items(items)
.build();
}
private List<Map<String, Object>> readAnnotationItems(Long taskId) {
AnnotationResult result = annotationResultMapper.selectByTaskId(taskId);
if (result == null || result.getResultJson() == null || result.getResultJson().isBlank()) {
return Collections.emptyList();
}
try {
@SuppressWarnings("unchecked")
Map<String, Object> parsed = objectMapper.readValue(result.getResultJson(), Map.class);
Object items = parsed.get("items");
if (items instanceof List<?>) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> typedItems = (List<Map<String, Object>>) items;
return typedItems;
}
} catch (Exception e) {
log.warn("解析提取结果失败taskId={},将使用空 items: {}", taskId, e.getMessage());
}
return Collections.emptyList();
}
}

View File

@@ -33,4 +33,9 @@ public interface AnnotationResultMapper extends BaseMapper<AnnotationResult> {
*/
@Select("SELECT * FROM annotation_result WHERE task_id = #{taskId}")
AnnotationResult selectByTaskId(@Param("taskId") Long taskId);
@Insert("INSERT INTO annotation_result (task_id, company_id, result_json, created_at, updated_at) " +
"VALUES (#{taskId}, #{companyId}, #{resultJson}::jsonb, NOW(), NOW())")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
int insertWithJsonb(AnnotationResult result);
}

View File

@@ -0,0 +1,143 @@
package com.label.service;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.label.common.ai.AiServiceClient;
import com.label.common.context.CompanyContext;
import com.label.entity.AnnotationResult;
import com.label.entity.AnnotationTask;
import com.label.entity.SourceData;
import com.label.mapper.AnnotationResultMapper;
import com.label.mapper.AnnotationTaskMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Service
@RequiredArgsConstructor
public class AiAnnotationAsyncService {
private final AnnotationTaskMapper taskMapper;
private final ObjectMapper objectMapper;
private final AnnotationResultMapper resultMapper;
private final AiServiceClient aiServiceClient;
@Async("aiTaskExecutor")
public void processAnnotation(Long taskId, Long companyId, SourceData source) {
CompanyContext.set(companyId);
log.info("开始异步执行 AI 预标注任务ID: {}", taskId);
String dataType = source.getDataType().toUpperCase();
AiServiceClient.ExtractionResponse aiResponse = null;
int maxRetries = 2;
Exception lastException = null;
String finalStatus = "FAILED";
try {
for (int attempt = 1; attempt <= maxRetries; attempt++) {
try {
if ("IMAGE".equals(dataType)) {
AiServiceClient.ImageExtractRequest req = AiServiceClient.ImageExtractRequest.builder()
.filePath(source.getFilePath())
.taskId(taskId)
.build();
aiResponse = aiServiceClient.extractImage(req);
} else {
AiServiceClient.TextExtractRequest req = AiServiceClient.TextExtractRequest.builder()
.filePath(source.getFilePath())
.fileName(source.getFileName())
.build();
aiResponse = aiServiceClient.extractText(req);
}
if (aiResponse != null) {
log.info("AI 预标注成功任务ID: {}, 尝试次数: {}", taskId, attempt);
break;
}
} catch (Exception e) {
lastException = e;
log.warn("AI 预标注调用失败(任务 {}),第 {} 次尝试:{}", taskId, attempt, e.getMessage());
if (attempt < maxRetries) {
try {
Thread.sleep(1000L * attempt);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
List<?> items = Collections.emptyList();
if (aiResponse != null && aiResponse.getItems() != null) {
items = aiResponse.getItems();
}
writeOrUpdateResult(taskId, companyId, items);
finalStatus = "COMPLETED";
} catch (Exception e) {
lastException = e;
log.error("AI 预标注处理过程中发生未知异常任务ID: {}", taskId, e);
finalStatus = "FAILED";
} finally {
try {
AnnotationTask updateEntity = new AnnotationTask();
updateEntity.setId(taskId);
updateEntity.setAiStatus(finalStatus);
if ("FAILED".equals(finalStatus)) {
String reason = lastException != null ? lastException.getMessage() : "AI处理失败";
if (reason != null && reason.length() > 500) {
reason = reason.substring(0, 500);
}
updateEntity.setRejectReason(reason);
}
int rows = taskMapper.updateById(updateEntity);
log.info("异步 AI 预标注结束任务ID: {}, 最终状态: {}, row {}", taskId, finalStatus, rows);
} catch (Exception updateEx) {
log.error("更新任务 AI 状态失败任务ID: {}", taskId, updateEx);
} finally {
CompanyContext.clear();
}
}
}
private void writeOrUpdateResult(Long taskId, Long companyId, List<?> items) {
try {
String json = objectMapper
.writeValueAsString(Map.of("items", items != null ? items : Collections.emptyList()));
int updated = resultMapper.updateResultJson(taskId, json, companyId);
if (updated == 0) {
try {
AnnotationResult result = new AnnotationResult();
result.setTaskId(taskId);
result.setCompanyId(companyId);
result.setResultJson(json);
resultMapper.insertWithJsonb(result);
log.info("新建AI预标注结果任务ID: {}", taskId);
} catch (Exception insertEx) {
if (insertEx.getMessage() != null && insertEx.getMessage().contains("duplicate key")) {
log.warn("检测到并发插入冲突转为更新模式任务ID: {}", taskId);
resultMapper.updateResultJson(taskId, json, companyId);
} else {
throw insertEx;
}
}
} else {
log.info("更新AI预标注结果任务ID: {}", taskId);
}
} catch (Exception e) {
log.error("写入 AI 预标注结果失败, taskId={}", taskId, e);
throw new RuntimeException("RESULT_WRITE_FAILED: " + e.getMessage(), e);
}
}
}

View File

@@ -1,33 +1,30 @@
package com.label.service;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.label.common.ai.AiServiceClient;
import com.label.common.exception.BusinessException;
import com.label.common.auth.TokenPrincipal;
import com.label.common.statemachine.StateValidator;
import com.label.common.statemachine.TaskStatus;
import com.label.entity.AnnotationResult;
import com.label.entity.TrainingDataset;
import com.label.event.ExtractionApprovedEvent;
import com.label.mapper.AnnotationResultMapper;
import com.label.mapper.TrainingDatasetMapper;
import com.label.entity.SourceData;
import com.label.mapper.SourceDataMapper;
import com.label.entity.AnnotationTask;
import com.label.mapper.AnnotationTaskMapper;
import com.label.service.TaskClaimService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Map;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Map;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.label.common.auth.TokenPrincipal;
import com.label.common.exception.BusinessException;
import com.label.common.statemachine.StateValidator;
import com.label.common.statemachine.TaskStatus;
import com.label.entity.AnnotationResult;
import com.label.entity.AnnotationTask;
import com.label.entity.SourceData;
import com.label.event.ExtractionApprovedEvent;
import com.label.mapper.AnnotationResultMapper;
import com.label.mapper.AnnotationTaskMapper;
import com.label.mapper.SourceDataMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* 提取阶段标注服务AI 预标注、更新结果、提交、审批、驳回。
@@ -43,12 +40,13 @@ public class ExtractionService {
private final AnnotationTaskMapper taskMapper;
private final AnnotationResultMapper resultMapper;
private final TrainingDatasetMapper datasetMapper;
// private final TrainingDatasetMapper datasetMapper;
private final SourceDataMapper sourceDataMapper;
private final TaskClaimService taskClaimService;
private final AiServiceClient aiServiceClient;
// private final AiServiceClient aiServiceClient;
private final ApplicationEventPublisher eventPublisher;
private final ObjectMapper objectMapper;
private final AiAnnotationAsyncService aiAnnotationAsyncService; // 注入异步服务
@Value("${rustfs.bucket:label-source-data}")
private String bucket;
@@ -67,32 +65,30 @@ public class ExtractionService {
throw new BusinessException("NOT_FOUND", "关联资料不存在", HttpStatus.NOT_FOUND);
}
// 调用 AI 服务(在事务外,避免长时间持有 DB 连接)
AiServiceClient.ExtractionRequest req = AiServiceClient.ExtractionRequest.builder()
.sourceId(source.getId())
.filePath(source.getFilePath())
.bucket(bucket)
.build();
AiServiceClient.ExtractionResponse aiResponse;
try {
if ("IMAGE".equals(source.getDataType())) {
aiResponse = aiServiceClient.extractImage(req);
} else {
aiResponse = aiServiceClient.extractText(req);
}
} catch (Exception e) {
log.warn("AI 预标注调用失败(任务 {}{}", taskId, e.getMessage());
// AI 失败不阻塞流程,写入空结果
aiResponse = new AiServiceClient.ExtractionResponse();
aiResponse.setItems(Collections.emptyList());
if (source.getFilePath() == null || source.getFilePath().isEmpty()) {
throw new BusinessException("INVALID_SOURCE", "源文件路径不能为空", HttpStatus.BAD_REQUEST);
}
// 将 AI 结果写入 annotation_resultUPSERT 语义)
writeOrUpdateResult(taskId, principal.getCompanyId(), aiResponse.getItems());
if (source.getDataType() == null || source.getDataType().isEmpty()) {
throw new BusinessException("INVALID_SOURCE", "数据类型不能为空", HttpStatus.BAD_REQUEST);
}
// ------------------------------------------------------------------ 更新结果 --
String dataType = source.getDataType().toUpperCase();
if (!"IMAGE".equals(dataType) && !"TEXT".equals(dataType)) {
log.warn("不支持的数据类型: {}, 任务ID: {}", dataType, taskId);
throw new BusinessException("UNSUPPORTED_TYPE",
"不支持的数据类型: " + dataType, HttpStatus.BAD_REQUEST);
}
// 更新任务状态为 PROCESSING
taskMapper.update(null, new LambdaUpdateWrapper<AnnotationTask>()
.eq(AnnotationTask::getId, taskId)
.set(AnnotationTask::getAiStatus, "PROCESSING"));
// 触发异步任务
aiAnnotationAsyncService.processAnnotation(taskId, principal.getCompanyId(), source);
// executeAiAnnotationAsync(taskId, principal.getCompanyId(), source);
}
/**
* 人工更新标注结果整体覆盖PUT 语义)。
@@ -237,8 +233,7 @@ public class ExtractionService {
"sourceType", source != null ? source.getDataType() : "",
"sourceFilePath", source != null && source.getFilePath() != null ? source.getFilePath() : "",
"isFinal", task.getIsFinal() != null && task.getIsFinal(),
"resultJson", result != null ? result.getResultJson() : "[]"
);
"resultJson", result != null ? result.getResultJson() : "[]");
}
// ------------------------------------------------------------------ 私有工具 --
@@ -253,20 +248,4 @@ public class ExtractionService {
}
return task;
}
private void writeOrUpdateResult(Long taskId, Long companyId, java.util.List<?> items) {
try {
String json = objectMapper.writeValueAsString(Map.of("items", items != null ? items : Collections.emptyList()));
int updated = resultMapper.updateResultJson(taskId, json, companyId);
if (updated == 0) {
AnnotationResult result = new AnnotationResult();
result.setTaskId(taskId);
result.setCompanyId(companyId);
result.setResultJson(json);
resultMapper.insert(result);
}
} catch (Exception e) {
log.error("写入 AI 预标注结果失败: taskId={}", taskId, e);
}
}
}

View File

@@ -1,74 +1,73 @@
package com.label.service;
import com.label.common.ai.AiServiceClient;
import com.label.common.exception.BusinessException;
import com.label.common.auth.TokenPrincipal;
import com.label.common.exception.BusinessException;
import com.label.common.storage.RustFsClient;
import com.label.entity.ExportBatch;
import com.label.mapper.ExportBatchMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Map;
/**
* GLM 微调服务:提交任务、查询状态。
*
* 注意trigger() 包含 AI HTTP 调用,不在 @Transactional 注解下。
* 仅在 DB 写入时开启事务updateFinetuneInfo
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class FinetuneService {
private static final String FINETUNE_BUCKET = "finetune-export";
private static final int PRESIGNED_URL_MINUTES = 60;
private final ExportBatchMapper exportBatchMapper;
private final ExportService exportService;
private final AiServiceClient aiServiceClient;
private final RustFsClient rustFsClient;
// ------------------------------------------------------------------ 提交微调 --
private String finetuneBaseModel = "qwen3-14b";
/**
* 向 GLM AI 服务提交微调任务。
*
* T074 设计AI 调用不在 @Transactional 内执行,避免持有 DB 连接期间发起 HTTP 请求。
* DB 写入updateFinetuneInfo是单条 UPDATE不需要显式事务自动提交
* 如果 AI 调用成功但 DB 写入失败,下次查询状态仍可通过 AI 服务的 jobId 重建状态。
*
* @param batchId 批次 ID
* @param principal 当前用户
* @return 包含 glmJobId 和 finetuneStatus 的 Map
*/
public Map<String, Object> trigger(Long batchId, TokenPrincipal principal) {
ExportBatch batch = exportService.getById(batchId, principal);
if (!"NOT_STARTED".equals(batch.getFinetuneStatus())) {
throw new BusinessException("FINETUNE_ALREADY_STARTED",
"微调任务已提交,当前状态: " + batch.getFinetuneStatus(), HttpStatus.CONFLICT);
throw new BusinessException(
"FINETUNE_ALREADY_STARTED",
"微调任务已提交,当前状态 " + batch.getFinetuneStatus(),
HttpStatus.CONFLICT
);
}
// 调用 AI 服务(无事务,不持有 DB 连接)
AiServiceClient.FinetuneRequest req = AiServiceClient.FinetuneRequest.builder()
.datasetPath(batch.getDatasetFilePath())
.model("glm-4")
.batchId(batchId)
String jsonlUrl = rustFsClient.getPresignedUrl(
FINETUNE_BUCKET,
batch.getDatasetFilePath(),
PRESIGNED_URL_MINUTES
);
AiServiceClient.FinetuneStartRequest req = AiServiceClient.FinetuneStartRequest.builder()
.jsonlUrl(jsonlUrl)
.baseModel(finetuneBaseModel)
.hyperparams(Map.of())
.build();
AiServiceClient.FinetuneResponse response;
AiServiceClient.FinetuneStartResponse response;
try {
response = aiServiceClient.startFinetune(req);
} catch (Exception e) {
throw new BusinessException("FINETUNE_TRIGGER_FAILED",
"提交微调任务失败: " + e.getMessage(), HttpStatus.SERVICE_UNAVAILABLE);
throw new BusinessException(
"FINETUNE_TRIGGER_FAILED",
"提交微调任务失败: " + e.getMessage(),
HttpStatus.SERVICE_UNAVAILABLE
);
}
// AI 调用成功后更新批次记录(单条 UPDATE自动提交
exportBatchMapper.updateFinetuneInfo(batchId,
response.getJobId(), "RUNNING", principal.getCompanyId());
log.info("微调任务已提交: batchId={}, glmJobId={}", batchId, response.getJobId());
exportBatchMapper.updateFinetuneInfo(
batchId,
response.getJobId(),
"RUNNING",
principal.getCompanyId()
);
return Map.of(
"glmJobId", response.getJobId(),
@@ -76,15 +75,6 @@ public class FinetuneService {
);
}
// ------------------------------------------------------------------ 查询状态 --
/**
* 查询微调任务实时状态(向 AI 服务查询)。
*
* @param batchId 批次 ID
* @param principal 当前用户
* @return 状态 Map
*/
public Map<String, Object> getStatus(Long batchId, TokenPrincipal principal) {
ExportBatch batch = exportService.getById(batchId, principal);
@@ -98,13 +88,11 @@ public class FinetuneService {
);
}
// 向 AI 服务实时查询
AiServiceClient.FinetuneStatusResponse statusResp;
try {
statusResp = aiServiceClient.getFinetuneStatus(batch.getGlmJobId());
} catch (Exception e) {
log.warn("查询微调状态失败batchId={}{}", batchId, e.getMessage());
// 查询失败时返回 DB 中的缓存状态
log.warn("查询微调状态失败(batchId={}): {}", batchId, e.getMessage());
return Map.of(
"batchId", batchId,
"glmJobId", batch.getGlmJobId(),

View File

@@ -190,6 +190,7 @@ public class TaskService {
.sourceId(task.getSourceId())
.taskType(task.getTaskType())
.status(task.getStatus())
.aiStatus(task.getAiStatus())
.claimedBy(task.getClaimedBy())
.claimedAt(task.getClaimedAt())
.submittedAt(task.getSubmittedAt())

View File

@@ -1,17 +1,18 @@
package com.label.service;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.label.common.ai.AiServiceClient;
import com.label.common.exception.BusinessException;
import com.label.common.statemachine.SourceStatus;
import com.label.common.statemachine.StateValidator;
import com.label.common.statemachine.VideoSourceStatus;
import com.label.entity.SourceData;
import com.label.mapper.SourceDataMapper;
import com.label.entity.VideoProcessJob;
import com.label.mapper.SourceDataMapper;
import com.label.mapper.VideoProcessJobMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -21,20 +22,6 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import java.time.LocalDateTime;
import java.util.Map;
/**
* 视频处理服务:创建任务、处理回调、管理员重置。
*
* 状态流转:
* - 创建时source_data → PREPROCESSINGjob → PENDING
* - 回调成功job → SUCCESSsource_data → PENDING进入提取队列
* - 回调失败可重试job → RETRYINGretryCount++,重新触发 AI
* - 回调失败超出上限job → FAILEDsource_data → PENDING
* - 管理员重置job → PENDING可手动重新触发
*
* T074 设计说明:
* AI 调用通过 TransactionSynchronizationManager.registerSynchronization().afterCommit()
* 延迟到事务提交后执行,避免在持有 DB 连接期间进行 HTTP 调用。
*/
@Slf4j
@Service
@RequiredArgsConstructor
@@ -43,44 +30,27 @@ public class VideoProcessService {
private final VideoProcessJobMapper jobMapper;
private final SourceDataMapper sourceDataMapper;
private final AiServiceClient aiServiceClient;
private final ObjectMapper objectMapper;
@Value("${rustfs.bucket:label-source-data}")
private String bucket;
// ------------------------------------------------------------------ 创建任务 --
/**
* 创建视频处理任务并在事务提交后触发 AI 服务。
*
* DB 写入source_data→PREPROCESSING + 插入 job在 @Transactional 内完成;
* AI 触发通过 afterCommit() 在事务提交后执行,不占用 DB 连接。
*
* @param sourceId 资料 ID
* @param jobType 任务类型FRAME_EXTRACT / VIDEO_TO_TEXT
* @param params JSON 参数(如 {"frameInterval": 30}
* @param companyId 租户 ID
* @return 新建的 VideoProcessJob
*/
@Transactional
public VideoProcessJob createJob(Long sourceId, String jobType,
String params, Long companyId) {
public VideoProcessJob createJob(Long sourceId, String jobType, String params, Long companyId) {
SourceData source = sourceDataMapper.selectById(sourceId);
if (source == null || !companyId.equals(source.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "资料不存在: " + sourceId, HttpStatus.NOT_FOUND);
throw new BusinessException("NOT_FOUND", "资料不存在 " + sourceId, HttpStatus.NOT_FOUND);
}
validateJobType(jobType);
// source_data → PREPROCESSING
StateValidator.assertTransition(
SourceStatus.TRANSITIONS,
SourceStatus.valueOf(source.getStatus()), SourceStatus.PREPROCESSING);
VideoSourceStatus.TRANSITIONS,
VideoSourceStatus.valueOf(source.getStatus()),
VideoSourceStatus.PREPROCESSING
);
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, sourceId)
.set(SourceData::getStatus, "PREPROCESSING")
.set(SourceData::getUpdatedAt, LocalDateTime.now()));
// 插入 PENDING 任务
VideoProcessJob job = new VideoProcessJob();
job.setCompanyId(companyId);
job.setSourceId(sourceId);
@@ -91,48 +61,32 @@ public class VideoProcessService {
job.setMaxRetries(3);
jobMapper.insert(job);
// 事务提交后触发 AI不在事务内不占用 DB 连接)
final Long jobId = job.getId();
final String filePath = source.getFilePath();
final String finalJobType = jobType;
final String finalParams = job.getParams();
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
triggerAi(jobId, sourceId, filePath, finalJobType);
triggerAi(jobId, sourceId, filePath, finalJobType, finalParams);
}
});
log.info("视频处理任务已创建AI 将在事务提交后触发): jobId={}, sourceId={}", jobId, sourceId);
log.info("视频处理任务已创建: jobId={}, sourceId={}", jobId, sourceId);
return job;
}
// ------------------------------------------------------------------ 处理回调 --
/**
* 处理 AI 服务异步回调POST /api/video/callback无需用户 Token
*
* 幂等:若 job 已为 SUCCESS直接返回防止重复处理。
* 重试触发同样延迟到事务提交后afterCommit不在事务内执行。
*
* @param jobId 任务 ID
* @param callbackStatus AI 回调状态SUCCESS / FAILED
* @param outputPath 成功时的输出路径(可选)
* @param errorMessage 失败时的错误信息(可选)
*/
@Transactional
public void handleCallback(Long jobId, String callbackStatus,
String outputPath, String errorMessage) {
// video_process_job 在 IGNORED_TABLES 中(回调无 CompanyContext此处显式校验
public void handleCallback(Long jobId, String callbackStatus, String outputPath, String errorMessage) {
VideoProcessJob job = jobMapper.selectById(jobId);
if (job == null || job.getCompanyId() == null) {
log.warn("视频处理回调job 不存在jobId={}", jobId);
log.warn("视频处理回调job 不存在: jobId={}", jobId);
return;
}
// 幂等:已成功则忽略重复回调
if ("SUCCESS".equals(job.getStatus())) {
log.info("视频处理回调幂等jobId={} 已为 SUCCESS跳过", jobId);
log.info("视频处理回调幂等跳过: jobId={}", jobId);
return;
}
@@ -143,28 +97,19 @@ public class VideoProcessService {
}
}
// ------------------------------------------------------------------ 管理员重置 --
/**
* 管理员手动重置失败任务FAILED → PENDING
*
* 仅允许 FAILED 状态的任务重置,重置后 retryCount 清零,
* 管理员可随后重新调用 createJob 触发处理。
*
* @param jobId 任务 ID
* @param companyId 租户 ID
*/
@Transactional
public VideoProcessJob reset(Long jobId, Long companyId) {
VideoProcessJob job = jobMapper.selectById(jobId);
if (job == null || !companyId.equals(job.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND);
throw new BusinessException("NOT_FOUND", "视频处理任务不存在 " + jobId, HttpStatus.NOT_FOUND);
}
if (!"FAILED".equals(job.getStatus())) {
throw new BusinessException("INVALID_TRANSITION",
"只有 FAILED 状态的任务可以重置,当前状态: " + job.getStatus(),
HttpStatus.BAD_REQUEST);
throw new BusinessException(
"INVALID_TRANSITION",
"只有 FAILED 状态的任务可以重置,当前状态 " + job.getStatus(),
HttpStatus.BAD_REQUEST
);
}
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
@@ -176,24 +121,18 @@ public class VideoProcessService {
job.setStatus("PENDING");
job.setRetryCount(0);
log.info("视频处理任务已重置: jobId={}", jobId);
return job;
}
// ------------------------------------------------------------------ 查询 --
public VideoProcessJob getJob(Long jobId, Long companyId) {
VideoProcessJob job = jobMapper.selectById(jobId);
if (job == null || !companyId.equals(job.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND);
throw new BusinessException("NOT_FOUND", "视频处理任务不存在 " + jobId, HttpStatus.NOT_FOUND);
}
return job;
}
// ------------------------------------------------------------------ 私有方法 --
private void handleSuccess(VideoProcessJob job, String outputPath) {
// job → SUCCESS
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
.eq(VideoProcessJob::getId, job.getId())
.set(VideoProcessJob::getStatus, "SUCCESS")
@@ -201,13 +140,10 @@ public class VideoProcessService {
.set(VideoProcessJob::getCompletedAt, LocalDateTime.now())
.set(VideoProcessJob::getUpdatedAt, LocalDateTime.now()));
// source_data PREPROCESSING → PENDING进入提取队列
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, job.getSourceId())
.set(SourceData::getStatus, "PENDING")
.set(SourceData::getUpdatedAt, LocalDateTime.now()));
log.info("视频处理成功jobId={}, sourceId={}", job.getId(), job.getSourceId());
}
private void handleFailure(VideoProcessJob job, String errorMessage) {
@@ -215,7 +151,6 @@ public class VideoProcessService {
int maxRetries = job.getMaxRetries() != null ? job.getMaxRetries() : 3;
if (newRetryCount < maxRetries) {
// 仍有重试次数job → RETRYING事务提交后重新触发 AI
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
.eq(VideoProcessJob::getId, job.getId())
.set(VideoProcessJob::getStatus, "RETRYING")
@@ -223,26 +158,22 @@ public class VideoProcessService {
.set(VideoProcessJob::getErrorMessage, errorMessage)
.set(VideoProcessJob::getUpdatedAt, LocalDateTime.now()));
log.warn("视频处理失败,开始第 {} 次重试jobId={}, error={}",
newRetryCount, job.getId(), errorMessage);
// 重试 AI 触发延迟到事务提交后
SourceData source = sourceDataMapper.selectById(job.getSourceId());
if (source != null) {
final Long jobId = job.getId();
final Long sourceId = job.getSourceId();
final String filePath = source.getFilePath();
final String jobType = job.getJobType();
final String params = job.getParams();
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
triggerAi(jobId, sourceId, filePath, jobType);
triggerAi(jobId, sourceId, filePath, jobType, params);
}
});
}
} else {
// 超出最大重试次数job → FAILEDsource_data → PENDING
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
.eq(VideoProcessJob::getId, job.getId())
.set(VideoProcessJob::getStatus, "FAILED")
@@ -251,40 +182,87 @@ public class VideoProcessService {
.set(VideoProcessJob::getCompletedAt, LocalDateTime.now())
.set(VideoProcessJob::getUpdatedAt, LocalDateTime.now()));
// source_data PREPROCESSING → PENDING管理员可重新处理
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, job.getSourceId())
.set(SourceData::getStatus, "PENDING")
.set(SourceData::getUpdatedAt, LocalDateTime.now()));
log.error("视频处理永久失败jobId={}, sourceId={}, error={}",
job.getId(), job.getSourceId(), errorMessage);
}
}
private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType) {
AiServiceClient.VideoProcessRequest req = AiServiceClient.VideoProcessRequest.builder()
.sourceId(sourceId)
.filePath(filePath)
.bucket(bucket)
.params(Map.of("jobId", jobId, "jobType", jobType))
.build();
private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType, String paramsJson) {
Map<String, Object> params = parseParams(paramsJson);
try {
if ("FRAME_EXTRACT".equals(jobType)) {
aiServiceClient.extractFrames(req);
aiServiceClient.extractFrames(AiServiceClient.ExtractFramesRequest.builder()
.filePath(filePath)
.sourceId(sourceId)
.jobId(jobId)
.mode(stringParam(params, "mode", "interval"))
.frameInterval(intParam(params, "frameInterval", 30))
.build());
} else {
aiServiceClient.videoToText(req);
aiServiceClient.videoToText(AiServiceClient.VideoToTextRequest.builder()
.filePath(filePath)
.sourceId(sourceId)
.jobId(jobId)
.startSec(doubleParam(params, "startSec", 0.0))
.endSec(doubleParam(params, "endSec", 120.0))
.model(stringParam(params, "model", null))
.promptTemplate(stringParam(params, "promptTemplate", null))
.build());
}
log.info("AI 触发成功: jobId={}", jobId);
log.info("AI 视频任务已触发: jobId={}", jobId);
} catch (Exception e) {
log.error("触发视频处理 AI 失败jobId={}{}job 保持当前状态,需管理员手动重置", jobId, e.getMessage());
log.error("触发视频处理 AI 失败(jobId={}): {}", jobId, e.getMessage());
}
}
private Map<String, Object> parseParams(String paramsJson) {
if (paramsJson == null || paramsJson.isBlank()) {
return Map.of();
}
try {
return objectMapper.readValue(paramsJson, new TypeReference<>() {});
} catch (Exception e) {
log.warn("解析视频处理参数失败,将使用默认值: {}", e.getMessage());
return Map.of();
}
}
private String stringParam(Map<String, Object> params, String key, String defaultValue) {
Object value = params.get(key);
return value == null ? defaultValue : String.valueOf(value);
}
private Integer intParam(Map<String, Object> params, String key, Integer defaultValue) {
Object value = params.get(key);
if (value instanceof Number number) {
return number.intValue();
}
if (value instanceof String text && !text.isBlank()) {
return Integer.parseInt(text);
}
return defaultValue;
}
private Double doubleParam(Map<String, Object> params, String key, Double defaultValue) {
Object value = params.get(key);
if (value instanceof Number number) {
return number.doubleValue();
}
if (value instanceof String text && !text.isBlank()) {
return Double.parseDouble(text);
}
return defaultValue;
}
private void validateJobType(String jobType) {
if (!"FRAME_EXTRACT".equals(jobType) && !"VIDEO_TO_TEXT".equals(jobType)) {
throw new BusinessException("INVALID_JOB_TYPE",
"任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT", HttpStatus.BAD_REQUEST);
throw new BusinessException(
"INVALID_JOB_TYPE",
"任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT",
HttpStatus.BAD_REQUEST
);
}
}
}

View File

@@ -1,11 +1,13 @@
server:
port: 18082
servlet:
context-path: /label
spring:
application:
name: label-backend
servlet:
multipart:
max-file-size: 500MB
max-request-size: 500MB
datasource:
url: ${SPRING_DATASOURCE_URL:jdbc:postgresql://39.107.112.174:5432/labeldb}
username: ${SPRING_DATASOURCE_USERNAME:postgres}
@@ -62,11 +64,12 @@ rustfs:
region: us-east-1
ai-service:
base-url: ${AI_SERVICE_BASE_URL:http://http://172.28.77.215:18000}
timeout: 30000
base-url: ${AI_SERVICE_BASE_URL:http://172.28.77.215:18000}
#base-url: ${AI_SERVICE_BASE_URL:http://127.0.0.1:18000}
timeout: 300000
auth:
enabled: false
enabled: true
mock-company-id: 1
mock-user-id: 1
mock-role: ADMIN

View File

@@ -87,6 +87,7 @@ CREATE TABLE IF NOT EXISTS annotation_task (
completed_at TIMESTAMP,
is_final BOOLEAN NOT NULL DEFAULT FALSE, -- true 即 APPROVED 且无需再审
ai_model VARCHAR(50),
ai_status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
reject_reason TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
@@ -313,7 +314,7 @@ INSERT INTO sys_config (company_id, config_key, config_value, description)
VALUES
(NULL, 'token_ttl_seconds', '7200',
'会话凭证有效期(秒)'),
(NULL, 'model_default', 'glm-4',
(NULL, 'model_default', 'qwen-plus',
'AI 辅助默认模型'),
(NULL, 'video_frame_interval', '30',
'视频帧提取间隔(帧数)'),