Files
label_backend/docs/superpowers/specs/2026-04-09-label-backend-design.md

1830 lines
83 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# label_backend 开发实施指南
**版本1.0.0 | 日期2026-04-09 | 依据backend后台设计.md v2.2 + constitution.md v1.1.0**
---
## 目录
- [一、项目总览](#一项目总览)
- [1.1 系统定位](#11-系统定位)
- [1.2 数据流水线](#12-数据流水线)
- [1.3 技术栈矩阵](#13-技术栈矩阵)
- [1.4 后端模块清单](#14-后端模块清单)
- [1.5 包结构](#15-包结构)
- [二、数据库 DDL](#二数据库-ddl)
- [三、公共基础设施](#三公共基础设施)
- [3.1 统一响应封装](#31-统一响应封装)
- [3.2 全局异常处理](#32-全局异常处理)
- [3.3 多租户 CompanyContextThreadLocal](#33-多租户-companycontextthreadlocal)
- [3.4 Shiro 配置](#34-shiro-配置)
- [3.5 Redis Key 管理](#35-redis-key-管理)
- [3.6 AOP 审计日志切面](#36-aop-审计日志切面)
- [3.7 RustFS S3 客户端](#37-rustfs-s3-客户端)
- [3.8 AI 服务 HTTP 客户端](#38-ai-服务-http-客户端)
- [四、业务模块纵切](#四业务模块纵切)
- [4.1 用户与权限模块](#41-用户与权限模块)
- [4.2 资料管理模块](#42-资料管理模块)
- [4.3 任务管理模块(含并发控制)](#43-任务管理模块含并发控制)
- [4.4 标注工作台模块EXTRACTION 阶段)](#44-标注工作台模块extraction-阶段)
- [4.5 问答生成模块QA_GENERATION 阶段)](#45-问答生成模块qa_generation-阶段)
- [4.6 训练数据导出模块](#46-训练数据导出模块)
- [4.7 系统配置模块](#47-系统配置模块)
- [4.8 视频处理模块](#48-视频处理模块)
- [五、状态机实现规范](#五状态机实现规范)
- [5.1 StateValidator](#51-statevalidator)
- [5.2 source_data 状态机](#52-source_data-状态机)
- [5.3 annotation_task 状态机](#53-annotation_task-状态机)
- [5.4 training_dataset 状态机](#54-training_dataset-状态机)
- [5.5 video_process_job 状态机](#55-video_process_job-状态机)
- [六、Docker Compose 配置](#六docker-compose-配置)
- [七、测试策略](#七测试策略)
- [7.1 基本原则](#71-基本原则)
- [7.2 并发任务领取测试(必须)](#72-并发任务领取测试必须)
- [7.3 视频回调幂等测试(必须)](#73-视频回调幂等测试必须)
- [7.4 状态机越界拒绝测试](#74-状态机越界拒绝测试)
- [7.5 多租户隔离测试](#75-多租户隔离测试)
- [八、宪章合规检查清单](#八宪章合规检查清单)
- [九、设计评审报告](#九设计评审报告)
- [9.1 发现并已修复的问题](#91-发现并已修复的问题)
- [9.2 未修复的设计选择(需业务澄清)](#92-未修复的设计选择需业务澄清)
- [9.3 修复后宪章合规状态](#93-修复后宪章合规状态)
- [9.4 审批流程合理性专项评审(第二轮)](#94-审批流程合理性专项评审第二轮)
- [9.5 数据库表设计完善性专项评审(第三轮)](#95-数据库表设计完善性专项评审第三轮)
---
## 一、项目总览
### 1.1 系统定位
label_backend 是知识图谱智能标注平台的后端服务,基于 Spring Boot 3 构建,驱动**文本线**和**图片线**两条标注流水线,将文档、图片、视频原始资料处理为 GLM 微调格式的训练数据集。
### 1.2 数据流水线
```
文本线: 文档 → 三元组提取(主语/谓语/宾语 + 原文片段 + 字符偏移)
→ 问答对生成 → 审批 → 训练样本GLM 文本格式)
图片线: 图片 → 四元组提取(主体/关系/客体/修饰词 + bbox + 裁剪图)
→ 问答对生成 → 审批 → 训练样本GLM 图文格式)
视频预处理(非独立流水线):
帧模式: 视频 → AI 抽帧 → 每帧作为图片进入图片线
片段模式: 视频 → 多模态模型转文本 → 派生 TEXT source_data → 进入文本线
```
### 1.3 技术栈矩阵
| 层次 | 技术 | 版本约束 |
|------|------|----------|
| 运行时 | JDK | 17LTS |
| 框架 | Spring Boot | ≥ 3.0.x |
| 认证/鉴权 | Apache Shiro | ≥ 1.13.x兼容 Spring Boot 3 |
| ORM | MyBatis Plus | ≥ 3.5.x |
| 数据库 | PostgreSQL | ≥ 14 |
| 缓存/锁 | Redis | ≥ 6.x |
| 对象存储 | RustFSS3 兼容接口) | 当前稳定版 |
| AI 服务 | Python FastAPIHTTP 调用) | JVM 侧仅作 RestClient 调用 |
| 容器化 | Docker Compose | ≥ 2.x |
| 构建工具 | Maven | ≥ 3.8 |
### 1.4 后端模块清单
| 模块 | 职责 |
|------|------|
| 用户与权限模块 | 用户管理、Shiro RBAC、Token 认证 |
| 资料管理模块 | 文件上传至 RustFS、source_data 元数据管理 |
| 任务池模块 | 任务创建、领取(分布式锁 + 乐观锁)、状态流转 |
| 标注工作台模块 | 调用 AI 提取三/四元组、annotation_result JSONB 写入 |
| 问答生成模块 | 调用 AI 生成候选问答对、training_dataset 管理 |
| 训练数据导出模块 | JSONL 批次导出、GLM 微调对接 |
| 系统配置模块 | Prompt 模板、模型参数、全局/公司级配置管理 |
| 视频处理模块 | 异步抽帧 / 转文本任务跟踪、幂等回调处理 |
### 1.5 包结构
```
com.label
├── LabelBackendApplication.java
├── common/
│ ├── result/ # Result<T>、ResultCode、PageResult<T>
│ ├── exception/ # BusinessException、GlobalExceptionHandler
│ ├── context/ # CompanyContextThreadLocal
│ ├── shiro/ # TokenFilter、UserRealm、ShiroConfig
│ ├── redis/ # RedisKeyManager、RedisService
│ ├── aop/ # AuditAspect、@OperationLog 注解
│ ├── storage/ # RustFsClientS3 兼容封装)
│ ├── ai/ # AiServiceClientRestClient 封装 8 个端点)
│ └── statemachine/ # StateValidator、各状态枚举
├── module/
│ ├── user/
│ │ ├── controller/ # AuthController、UserController
│ │ ├── service/ # AuthService、UserService
│ │ ├── mapper/ # SysUserMapper、SysCompanyMapper
│ │ ├── entity/ # SysUser、SysCompany
│ │ └── dto/ # LoginRequest、UserDTO、UserCreateRequest
│ ├── source/
│ │ ├── controller/ # SourceController
│ │ ├── service/ # SourceService
│ │ ├── mapper/ # SourceDataMapper
│ │ ├── entity/ # SourceData
│ │ └── dto/ # SourceUploadResponse、SourceListQuery
│ ├── task/
│ │ ├── controller/ # TaskController
│ │ ├── service/ # TaskService、TaskClaimService
│ │ ├── mapper/ # AnnotationTaskMapper、TaskHistoryMapper
│ │ ├── entity/ # AnnotationTask、AnnotationTaskHistory
│ │ └── dto/ # TaskCreateRequest、TaskPoolQuery、TaskDetailDTO
│ ├── annotation/
│ │ ├── controller/ # ExtractionController、QaController
│ │ ├── service/ # ExtractionService、QaService
│ │ ├── mapper/ # AnnotationResultMapper、TrainingDatasetMapper
│ │ ├── entity/ # AnnotationResult、TrainingDataset
│ │ └── dto/ # ExtractionResultDTO、QaItemDTO、RejectRequest
│ ├── export/
│ │ ├── controller/ # ExportController
│ │ ├── service/ # ExportService、FinetuneService
│ │ ├── mapper/ # ExportBatchMapper
│ │ ├── entity/ # ExportBatch
│ │ └── dto/ # ExportBatchRequest、FinetuneRequest
│ ├── config/
│ │ ├── controller/ # SysConfigController
│ │ ├── service/ # SysConfigService
│ │ ├── mapper/ # SysConfigMapper
│ │ ├── entity/ # SysConfig
│ │ └── dto/ # ConfigUpdateRequest
│ └── video/
│ ├── controller/ # VideoController
│ ├── service/ # VideoProcessService
│ ├── mapper/ # VideoProcessJobMapper
│ ├── entity/ # VideoProcessJob
│ └── dto/ # VideoProcessRequest、VideoCallbackRequest
```
[↑ 返回目录](#目录)
---
## 二、数据库 DDL
[↑ 返回目录](#目录)
> 执行顺序sys_company → sys_user → source_data → annotation_task → annotation_result → training_dataset → export_batch → sys_config → sys_operation_log → annotation_task_history → video_process_job
```sql
-- =============================================
-- 1. sys_company — 公司表(多租户根节点)
-- =============================================
CREATE TABLE sys_company (
id BIGSERIAL PRIMARY KEY,
company_name VARCHAR(100) NOT NULL UNIQUE,
company_code VARCHAR(50) NOT NULL UNIQUE,
status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE', -- ACTIVE / DISABLED
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- =============================================
-- 2. sys_user — 用户表
-- =============================================
CREATE TABLE sys_user (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
username VARCHAR(50) NOT NULL,
password_hash VARCHAR(255) NOT NULL, -- BCrypt强度因子 >= 10
real_name VARCHAR(50),
role VARCHAR(20) NOT NULL, -- UPLOADER / ANNOTATOR / REVIEWER / ADMIN
status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT uq_company_username UNIQUE (company_id, username)
);
CREATE INDEX idx_sys_user_company ON sys_user(company_id);
-- =============================================
-- 3. source_data — 原始资料元数据表
-- =============================================
CREATE TABLE source_data (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
uploader_id BIGINT REFERENCES sys_user(id),
data_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO
file_path VARCHAR(500) NOT NULL,
file_name VARCHAR(255) NOT NULL,
file_size BIGINT,
bucket_name VARCHAR(100) NOT NULL,
parent_source_id BIGINT REFERENCES source_data(id), -- 视频转文本时指向原视频
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
-- PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED / REJECTED
reject_reason TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_source_data_company ON source_data(company_id);
CREATE INDEX idx_source_data_status ON source_data(company_id, status);
CREATE INDEX idx_source_data_parent ON source_data(parent_source_id);
-- =============================================
-- 4. annotation_task — 标注任务表
-- =============================================
CREATE TABLE annotation_task (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
source_id BIGINT NOT NULL REFERENCES source_data(id),
phase VARCHAR(20) NOT NULL, -- EXTRACTION / QA_GENERATION
task_type VARCHAR(20) NOT NULL, -- AI_ASSISTED / MANUAL
ai_model VARCHAR(50),
video_unit_type VARCHAR(20), -- FRAME仅视频帧模式填写其余为空
video_unit_info JSONB, -- {"frame_index":150,"time_sec":5.0,"frame_path":"..."}
claimed_by BIGINT REFERENCES sys_user(id),
claimed_at TIMESTAMP,
status VARCHAR(20) NOT NULL DEFAULT 'UNCLAIMED',
-- UNCLAIMED / IN_PROGRESS / SUBMITTED / APPROVED / REJECTED
reject_reason TEXT,
submitted_at TIMESTAMP,
completed_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_annotation_task_company ON annotation_task(company_id);
CREATE INDEX idx_annotation_task_pool ON annotation_task(company_id, phase, status);
CREATE INDEX idx_annotation_task_claimed ON annotation_task(claimed_by, status);
-- =============================================
-- 5. annotation_result — 标注结果表EXTRACTION 阶段JSONB 聚合)
-- =============================================
CREATE TABLE annotation_result (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
result_json JSONB NOT NULL, -- 整体覆盖,禁止局部 PATCH
is_final BOOLEAN NOT NULL DEFAULT FALSE,
submitted_by BIGINT REFERENCES sys_user(id),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_annotation_result_task ON annotation_result(task_id);
CREATE INDEX idx_annotation_result_final ON annotation_result(company_id, is_final);
-- =============================================
-- 6. training_dataset — 训练样本表(问答对终态)
-- =============================================
CREATE TABLE training_dataset (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
source_id BIGINT NOT NULL REFERENCES source_data(id),
extraction_result_id BIGINT NOT NULL REFERENCES annotation_result(id),
sample_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO_FRAME
glm_format_json JSONB NOT NULL, -- GLM 微调格式JSONB 支持字段级查询
export_batch_id VARCHAR(50), -- 未导出时为空
status VARCHAR(20) NOT NULL DEFAULT 'PENDING_REVIEW',
-- PENDING_REVIEW / APPROVED / REJECTED
reject_reason TEXT,
reviewed_by BIGINT REFERENCES sys_user(id),
exported_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_training_dataset_company ON training_dataset(company_id);
CREATE INDEX idx_training_dataset_status ON training_dataset(company_id, status);
CREATE INDEX idx_training_dataset_batch ON training_dataset(export_batch_id);
-- =============================================
-- 7. export_batch — 导出批次表
-- =============================================
CREATE TABLE export_batch (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
batch_uuid VARCHAR(50) NOT NULL UNIQUE,
dataset_file_path VARCHAR(500),
sample_count INT NOT NULL DEFAULT 0,
glm_job_id VARCHAR(100), -- 调用微调接口后填写,初始为 NULL
finetune_status VARCHAR(20) NOT NULL DEFAULT 'NOT_STARTED',
-- NOT_STARTED / RUNNING / SUCCESS / FAILED
error_message TEXT,
created_by BIGINT REFERENCES sys_user(id),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_export_batch_company ON export_batch(company_id);
-- =============================================
-- 8. sys_config — 系统配置表(支持全局默认 + 公司级覆盖)
-- =============================================
CREATE TABLE sys_config (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT REFERENCES sys_company(id), -- NULL = 全局默认配置
config_key VARCHAR(100) NOT NULL,
config_value TEXT NOT NULL,
description TEXT,
updated_by BIGINT REFERENCES sys_user(id),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT uq_config_company_key UNIQUE (company_id, config_key)
);
-- 预置全局配置项
INSERT INTO sys_config (company_id, config_key, config_value, description) VALUES
(NULL, 'prompt_extract_text', '...', '文本三元组提取 Prompt 模板'),
(NULL, 'prompt_extract_image', '...', '图像四元组提取 Prompt 模板'),
(NULL, 'prompt_video_to_text', '...', '视频转文本 Prompt 模板'),
(NULL, 'prompt_qa_gen_text', '...', '文本问答对生成 Prompt 模板'),
(NULL, 'prompt_qa_gen_image', '...', '图像问答对生成 Prompt 模板'),
(NULL, 'model_default', 'glm-4', '默认 AI 辅助模型'),
(NULL, 'video_frame_interval', '30', '视频帧模式抽帧间隔(帧数)'),
(NULL, 'token_ttl_seconds', '7200', 'Token 有效期(秒)'),
(NULL, 'glm_api_base_url', 'http://ai-service:8000', 'GLM API 地址');
-- =============================================
-- 9. sys_operation_log — 操作审计日志(只追加,按月分区)
-- =============================================
CREATE TABLE sys_operation_log (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT REFERENCES sys_company(id),
operator_id BIGINT REFERENCES sys_user(id), -- 登录失败时可为 NULL
operator_name VARCHAR(50) NOT NULL, -- 操作时用户名快照
operation_type VARCHAR(50) NOT NULL,
target_type VARCHAR(30),
target_id BIGINT,
detail JSONB,
ip_address VARCHAR(50),
result VARCHAR(10) NOT NULL, -- SUCCESS / FAIL
error_message TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- 初始分区(建议使用 pg_partman 自动维护)
CREATE TABLE sys_operation_log_2026_04
PARTITION OF sys_operation_log
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
CREATE TABLE sys_operation_log_2026_05
PARTITION OF sys_operation_log
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
-- =============================================
-- 10. annotation_task_history — 任务流转历史(只追加)
-- =============================================
CREATE TABLE annotation_task_history (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
from_status VARCHAR(20), -- 任务初建时为 NULL
to_status VARCHAR(20) NOT NULL,
operator_id BIGINT NOT NULL REFERENCES sys_user(id),
operator_role VARCHAR(20) NOT NULL, -- 操作时角色快照
note TEXT, -- 驳回原因、强制转移说明等
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_task_history_task ON annotation_task_history(task_id);
-- =============================================
-- 11. video_process_job — 视频异步处理任务表
-- =============================================
CREATE TABLE video_process_job (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES sys_company(id),
source_id BIGINT NOT NULL REFERENCES source_data(id),
job_type VARCHAR(20) NOT NULL, -- FRAME_EXTRACT / VIDEO_TO_TEXT
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
-- PENDING / RUNNING / SUCCESS / FAILED / RETRYING
params JSONB NOT NULL,
total_units INT,
processed_units INT NOT NULL DEFAULT 0,
output_path VARCHAR(500),
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
error_message TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_video_process_job_source ON video_process_job(source_id);
CREATE INDEX idx_video_process_job_status ON video_process_job(status);
```
---
## 三、公共基础设施
[↑ 返回目录](#目录)
### 3.1 统一响应封装
```java
// com/label/common/result/Result.java
public record Result<T>(String code, T data, String message) {
public static <T> Result<T> ok(T data) {
return new Result<>("SUCCESS", data, null);
}
public static Result<Void> ok() {
return new Result<>("SUCCESS", null, null);
}
public static <T> Result<T> fail(String code, String message) {
return new Result<>(code, null, message);
}
}
// com/label/common/result/PageResult.java
public record PageResult<T>(List<T> items, long total, int page, int pageSize) {}
```
所有 Controller 返回 `Result<T>`,禁止直接返回裸 POJO 或裸 List。分页接口返回 `Result<PageResult<T>>`
### 3.2 全局异常处理
```java
// com/label/common/exception/GlobalExceptionHandler.java
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(BusinessException.class)
public ResponseEntity<Result<?>> handleBusiness(BusinessException ex) {
return ResponseEntity.status(ex.getHttpStatus())
.body(Result.fail(ex.getCode(), ex.getMessage()));
}
@ExceptionHandler(UnauthorizedException.class)
public ResponseEntity<Result<?>> handleUnauthorized() {
return ResponseEntity.status(403)
.body(Result.fail("FORBIDDEN", "权限不足"));
}
@ExceptionHandler(Exception.class)
public ResponseEntity<Result<?>> handleUnexpected(Exception ex, HttpServletRequest req) {
log.error("Unexpected error on {}", req.getRequestURI(), ex);
// 生产响应禁止包含堆栈跟踪
return ResponseEntity.status(500)
.body(Result.fail("INTERNAL_ERROR", "服务器内部错误"));
}
}
```
### 3.3 多租户 CompanyContextThreadLocal
```java
// com/label/common/context/CompanyContext.java
public final class CompanyContext {
private static final ThreadLocal<Long> COMPANY_ID = new ThreadLocal<>();
public static void set(Long companyId) { COMPANY_ID.set(companyId); }
public static Long get() { return COMPANY_ID.get(); }
public static void clear() { COMPANY_ID.remove(); } // 必须在 finally 块调用
}
```
在 Shiro `TokenFilter``executeLogin` 成功后注入;在过滤器的 `finally` 块调用 `CompanyContext.clear()`,防止线程池复用时数据串漏至其他租户。
**禁止**调用方通过请求体传入 `company_id` 作为参数。所有 Service 通过 `CompanyContext.get()` 获取。
MyBatis Plus 配置 `TenantLineInnerInterceptor` 自动向所有 Mapper 查询注入 `company_id = ?` 条件:
```java
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(
new TenantLineHandler() {
public Expression getTenantId() {
return new LongValue(CompanyContext.get());
}
public String getTenantIdColumn() { return "company_id"; }
}
));
return interceptor;
}
```
### 3.4 Shiro 配置
#### TokenFilter继承 AuthenticatingFilter
```java
// com/label/common/shiro/TokenFilter.java
public class TokenFilter extends AuthenticatingFilter {
@Override
protected AuthenticationToken createToken(ServletRequest req, ServletResponse res) {
String token = extractToken((HttpServletRequest) req);
return new TokenAuthenticationToken(token);
}
@Override
protected boolean onAccessDenied(ServletRequest req, ServletResponse res) throws Exception {
String token = extractToken((HttpServletRequest) req);
if (token == null) {
sendUnauthorized(res, "缺少 Token");
return false;
}
return executeLogin(req, res);
}
@Override
protected boolean onLoginSuccess(AuthenticationToken token, Subject subject,
ServletRequest req, ServletResponse res) throws Exception {
UserSession session = (UserSession) subject.getPrincipal();
// 滑动过期:每次有效请求重置 TTL
long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
redisTemplate.expire(RedisKeyManager.tokenKey(session.getToken()), ttl, SECONDS);
// 注入租户上下文(在整个 Filter 链执行期间有效,由 doFilterInternal 的 finally 清理)
CompanyContext.set(session.getCompanyId());
return true; // 继续执行 Filter 链(进入 Controller
}
// ⚠️ 必须在此处清理 ThreadLocal而非 onLoginSuccess——
// onLoginSuccess 返回 true 后 Filter 链才继续执行,此时 Controller 尚未运行。
// doFilterInternal 的 finally 块保证请求结束后(无论正常还是异常)都清理上下文。
@Override
protected void doFilterInternal(ServletRequest req, ServletResponse res, FilterChain chain)
throws ServletException, IOException {
try {
super.doFilterInternal(req, res, chain);
} finally {
CompanyContext.clear();
}
}
private String extractToken(HttpServletRequest req) {
String header = req.getHeader("Authorization");
if (header != null && header.startsWith("Bearer ")) {
return header.substring(7);
}
return null;
}
}
```
#### UserRealm继承 AuthorizingRealm
```java
// com/label/common/shiro/UserRealm.java
public class UserRealm extends AuthorizingRealm {
// 认证:从 Redis 验证 token:{uuid} 是否存在
@Override
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) {
String uuid = ((TokenAuthenticationToken) token).getToken();
Map<Object, Object> data = redisTemplate.opsForHash()
.entries(RedisKeyManager.tokenKey(uuid));
if (data.isEmpty()) throw new UnknownAccountException("Token 无效或已过期");
UserSession session = UserSession.from(uuid, data);
return new SimpleAuthenticationInfo(session, uuid, getName());
}
// 鉴权:先查 Redis user:perm:{userId},未命中查 PostgreSQL
@Override
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
UserSession session = (UserSession) principals.getPrimaryPrincipal();
String permKey = RedisKeyManager.userPermKey(session.getUserId());
String cachedRole = (String) redisTemplate.opsForValue().get(permKey);
if (cachedRole == null) {
SysUser user = sysUserMapper.selectById(session.getUserId());
cachedRole = user.getRole();
redisTemplate.opsForValue().set(permKey, cachedRole, 5, MINUTES);
}
SimpleAuthorizationInfo info = new SimpleAuthorizationInfo();
info.addRole(cachedRole);
// 角色继承:高级角色包含所有低级角色权限
addInheritedRoles(info, cachedRole);
return info;
}
private void addInheritedRoles(SimpleAuthorizationInfo info, String role) {
// ADMIN ⊃ REVIEWER ⊃ ANNOTATOR ⊃ UPLOADER
switch (role) {
case "ADMIN" -> info.addRoles(Set.of("REVIEWER", "ANNOTATOR", "UPLOADER"));
case "REVIEWER" -> info.addRoles(Set.of("ANNOTATOR", "UPLOADER"));
case "ANNOTATOR" -> info.addRoles(Set.of("UPLOADER"));
}
}
}
```
#### ShiroConfig 过滤器链
```java
@Bean
public ShiroFilterChainDefinition shiroFilterChainDefinition() {
DefaultShiroFilterChainDefinition chain = new DefaultShiroFilterChainDefinition();
chain.addPathDefinition("/api/auth/login", "anon");
chain.addPathDefinition("/api/**", "tokenFilter");
return chain;
}
```
权限声明使用注解,禁止在 Service 内使用 if-role 临时判断:
```java
@RequiresRoles("ADMIN")
public void createTask(...)
@RequiresRoles(value = {"ANNOTATOR", "REVIEWER", "ADMIN"}, logical = Logical.OR)
public void claimTask(...)
```
#### 角色变更立即驱逐缓存
```java
// UserService.updateRole() 内
@Transactional
public void updateRole(Long userId, String newRole) {
userMapper.updateRole(userId, newRole);
// 立即驱逐,不等 TTL 自然过期(延迟在禁用高权限账号时存在安全窗口)
redisTemplate.delete(RedisKeyManager.userPermKey(userId));
}
```
### 3.5 Redis Key 管理
```java
// com/label/common/redis/RedisKeyManager.java
public final class RedisKeyManager {
public static String tokenKey(String uuid) { return "token:" + uuid; }
public static String userPermKey(Long userId) { return "user:perm:" + userId; }
public static String taskClaimKey(Long taskId){ return "task:claim:" + taskId; }
}
```
禁止在上述三类命名空间之外自造 Key 用于认证、权限或锁目的。
### 3.6 AOP 审计日志切面
```java
// com/label/common/aop/OperationLog.java注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface OperationLog {
String type(); // 对应 operation_type 枚举值,如 "TASK_CLAIM"
String targetType() default "";
}
// com/label/common/aop/AuditAspect.java
@Aspect
@Component
public class AuditAspect {
@Around("@annotation(operationLog)")
public Object audit(ProceedingJoinPoint pjp, OperationLog operationLog) throws Throwable {
String result = "SUCCESS";
String errorMsg = null;
Object returnVal = null;
try {
returnVal = pjp.proceed();
} catch (Throwable ex) {
result = "FAIL";
errorMsg = ex.getMessage();
throw ex;
} finally {
// 业务事务已 commit/rollback 后,以独立操作写入审计记录
// 审计写入失败只记录 error 日志,禁止回滚业务事务
try {
saveAuditLog(operationLog.type(), operationLog.targetType(), result, errorMsg);
} catch (Exception auditEx) {
log.error("审计日志写入失败: type={}", operationLog.type(), auditEx);
}
}
return returnVal;
}
private void saveAuditLog(String type, String targetType, String result, String errorMsg) {
UserSession session = getCurrentSession();
SysOperationLog log = SysOperationLog.builder()
.companyId(CompanyContext.get())
.operatorId(session != null ? session.getUserId() : null)
.operatorName(session != null ? session.getUsername() : "unknown")
.operationType(type)
.targetType(targetType.isEmpty() ? null : targetType)
.result(result)
.errorMessage(errorMsg)
.ipAddress(getClientIp())
.build();
operationLogMapper.insert(log);
}
}
```
### 3.7 RustFS S3 客户端
RustFS 实现 S3 兼容接口,使用 AWS SDK for Java v2 连接:
```java
// com/label/common/storage/RustFsClient.java
@Component
public class RustFsClient {
private final S3Client s3Client; // 配置 endpoint 指向 RustFS 地址
public String upload(String bucket, String key, InputStream data, long size) {
PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).build();
s3Client.putObject(req, RequestBody.fromInputStream(data, size));
return key;
}
public InputStream download(String bucket, String key) {
GetObjectRequest req = GetObjectRequest.builder().bucket(bucket).key(key).build();
return s3Client.getObject(req);
}
public void delete(String bucket, String key) {
s3Client.deleteObject(b -> b.bucket(bucket).key(key));
}
public String getPresignedUrl(String bucket, String key, Duration expiry) {
S3Presigner presigner = S3Presigner.builder().endpointOverride(endpointUri).build();
return presigner.presignGetObject(b -> b.signatureDuration(expiry)
.getObjectRequest(r -> r.bucket(bucket).key(key))).url().toString();
}
}
```
**对象存储路径规范:**
| 资源类型 | 存储桶 | 路径格式 |
|----------|--------|----------|
| 上传文本文件 | `source-data` | `text/{yyyyMM}/{source_id}.txt` |
| 上传图片 | `source-data` | `image/{yyyyMM}/{source_id}.jpg` |
| 上传视频 | `source-data` | `video/{yyyyMM}/{source_id}.mp4` |
| 视频帧图 | `source-data` | `frames/{source_id}/{frame_index}.jpg` |
| 视频片段转译文本 | `source-data` | `video-text/{parent_source_id}/{timestamp}.txt` |
| bbox 裁剪图 | `source-data` | `crops/{task_id}/{item_index}.jpg` |
| 导出 JSONL | `finetune-export` | `export/{batchUuid}.jsonl` |
### 3.8 AI 服务 HTTP 客户端
Spring Boot 3 使用 `RestClient` 封装对 Python FastAPI 服务的同步 HTTP 调用:
```java
// com/label/common/ai/AiServiceClient.java
@Component
public class AiServiceClient {
private final RestClient restClient;
public AiServiceClient(@Value("${ai.service.base-url}") String baseUrl) {
this.restClient = RestClient.builder().baseUrl(baseUrl).build();
}
// POST /api/v1/text/extract
public TextExtractResponse extractText(String filePath, String model) {
return restClient.post().uri("/api/v1/text/extract")
.body(Map.of("file_path", filePath, "model", model))
.retrieve().body(TextExtractResponse.class);
}
// POST /api/v1/image/extract
public ImageExtractResponse extractImage(String imagePath, String model) {
return restClient.post().uri("/api/v1/image/extract")
.body(Map.of("image_path", imagePath, "model", model))
.retrieve().body(ImageExtractResponse.class);
}
// POST /api/v1/video/extract-frames
public VideoFramesResponse extractFrames(String videoPath, String frameInterval, String mode) {
return restClient.post().uri("/api/v1/video/extract-frames")
.body(Map.of("video_path", videoPath, "frame_interval", frameInterval, "mode", mode))
.retrieve().body(VideoFramesResponse.class);
}
// POST /api/v1/video/to-text
public VideoToTextResponse videoToText(String videoPath, int startSec, int endSec, String model) {
return restClient.post().uri("/api/v1/video/to-text")
.body(Map.of("video_path", videoPath, "start_sec", startSec, "end_sec", endSec, "model", model))
.retrieve().body(VideoToTextResponse.class);
}
// POST /api/v1/qa/gen-text
public QaGenResponse genTextQa(List<?> triples, String model, String promptTemplate) {
return restClient.post().uri("/api/v1/qa/gen-text")
.body(Map.of("items", triples, "model", model, "prompt_template", promptTemplate))
.retrieve().body(QaGenResponse.class);
}
// POST /api/v1/qa/gen-image
public QaGenResponse genImageQa(List<?> quadruples, String model, String promptTemplate) {
return restClient.post().uri("/api/v1/qa/gen-image")
.body(Map.of("items", quadruples, "model", model, "prompt_template", promptTemplate))
.retrieve().body(QaGenResponse.class);
}
// POST /api/v1/finetune/start
public FinetuneStartResponse startFinetune(String jsonlUrl, String baseModel, Map<String, Object> params) {
return restClient.post().uri("/api/v1/finetune/start")
.body(Map.of("jsonl_url", jsonlUrl, "base_model", baseModel, "params", params))
.retrieve().body(FinetuneStartResponse.class);
}
// GET /api/v1/finetune/status/{jobId}
public FinetuneStatusResponse getFinetuneStatus(String jobId) {
return restClient.get().uri("/api/v1/finetune/status/{jobId}", jobId)
.retrieve().body(FinetuneStatusResponse.class);
}
}
```
---
## 四、业务模块纵切
[↑ 返回目录](#目录)
### 4.1 用户与权限模块
**EntitySysUser**:字段同 DDL`passwordHash` 字段加 `@JsonIgnore` 禁止序列化到响应。
**AuthService 核心逻辑:**
```java
// 登录
@OperationLog(type = "USER_LOGIN")
public String login(Long companyId, String username, String password, String ip) {
SysUser user = userMapper.selectByCompanyAndUsername(companyId, username);
if (user == null || !BCrypt.checkpw(password, user.getPasswordHash()))
throw new BusinessException("USER_NOT_FOUND", "用户名或密码错误");
if ("DISABLED".equals(user.getStatus()))
throw new BusinessException("USER_DISABLED", "账号已禁用");
String token = UUID.randomUUID().toString();
Map<String, Object> tokenData = Map.of(
"userId", user.getId(), "role", user.getRole(), "companyId", user.getCompanyId(),
"username", user.getUsername()
);
long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
redisTemplate.opsForHash().putAll(RedisKeyManager.tokenKey(token), tokenData);
redisTemplate.expire(RedisKeyManager.tokenKey(token), ttl, SECONDS);
return token;
}
// 退出登录
@OperationLog(type = "USER_LOGOUT")
public void logout(String token) {
redisTemplate.delete(RedisKeyManager.tokenKey(token));
}
```
**接口清单:**
| 方法 | 路径 | 最低权限 | 说明 |
|------|------|----------|------|
| POST | `/api/auth/login` | 匿名 | 返回 TokenUUID |
| POST | `/api/auth/logout` | 已登录 | 删除 Redis Token |
| GET | `/api/auth/me` | 已登录 | 当前用户信息与角色 |
| GET | `/api/users` | ADMIN | 分页查询用户列表 |
| POST | `/api/users` | ADMIN | 创建用户 |
| PUT | `/api/users/{id}` | ADMIN | 更新用户信息 |
| PUT | `/api/users/{id}/status` | ADMIN | 启用/禁用账号(同步驱逐权限缓存) |
| PUT | `/api/users/{id}/role` | ADMIN | 变更角色(同步驱逐权限缓存) |
---
### 4.2 资料管理模块
**SourceService 核心逻辑:**
```java
@OperationLog(type = "SOURCE_UPLOAD")
@Transactional
public SourceData upload(MultipartFile file, String dataType) {
Long companyId = CompanyContext.get();
// 1. 先创建 source_data 记录获取 ID用于构造存储路径
SourceData sd = SourceData.builder()
.companyId(companyId).uploaderId(getCurrentUserId())
.dataType(dataType).fileName(file.getOriginalFilename())
.fileSize(file.getSize()).bucketName("source-data")
.status("PENDING").build();
sourceDataMapper.insert(sd);
// 2. 构造路径并上传至 RustFS
String path = buildPath(dataType, sd.getId());
rustFsClient.upload("source-data", path, file.getInputStream(), file.getSize());
// 3. 更新 file_path
sd.setFilePath(path);
sourceDataMapper.updateById(sd);
return sd;
}
private String buildPath(String dataType, Long sourceId) {
String ym = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
return switch (dataType) {
case "TEXT" -> "text/" + ym + "/" + sourceId + ".txt";
case "IMAGE" -> "image/" + ym + "/" + sourceId + ".jpg";
case "VIDEO" -> "video/" + ym + "/" + sourceId + ".mp4";
default -> throw new BusinessException("INVALID_TYPE", "不支持的资料类型");
};
}
```
**接口清单:**
| 方法 | 路径 | 最低权限 | 说明 |
|------|------|----------|------|
| POST | `/api/source/upload` | UPLOADER | 上传文件,创建 source_data 记录 |
| GET | `/api/source/list` | UPLOADER | 分页查询UPLOADER 只见自己ADMIN 见全部) |
| GET | `/api/source/{id}` | UPLOADER | 查看资料详情 |
| DELETE | `/api/source/{id}` | ADMIN | 删除资料及 RustFS 文件 |
---
### 4.3 任务管理模块(含并发控制)
**TaskClaimService.claim() — 双重保障:**
```java
@Transactional
@OperationLog(type = "TASK_CLAIM")
public void claim(Long taskId) {
Long userId = getCurrentUserId();
// 第一重Redis 分布式锁TTL 30sSET NX
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent(RedisKeyManager.taskClaimKey(taskId), userId.toString(), 30, SECONDS);
if (!Boolean.TRUE.equals(locked))
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");
// 第二重数据库乐观约束WHERE status = 'UNCLAIMED'
// UPDATE annotation_task SET status='IN_PROGRESS', claimed_by=?, claimed_at=NOW()
// WHERE id=? AND status='UNCLAIMED' AND company_id=?
int affected = taskMapper.claimTask(taskId, userId, CompanyContext.get());
if (affected == 0)
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");
// 写入任务历史(同一事务)
insertHistory(taskId, "UNCLAIMED", "IN_PROGRESS", userId, null);
}
public void unclaim(Long taskId) {
StateValidator.assertTransition(TaskStatus.IN_PROGRESS, TaskStatus.UNCLAIMED, TaskStatus.TRANSITIONS);
// 更新 claimed_by = NULL, status = UNCLAIMED
taskMapper.unclaim(taskId, getCurrentUserId(), CompanyContext.get());
redisTemplate.delete(RedisKeyManager.taskClaimKey(taskId));
insertHistory(taskId, "IN_PROGRESS", "UNCLAIMED", getCurrentUserId(), null);
}
```
**每次 status 变更必须在同一事务中调用 `insertHistory()` 写入 `annotation_task_history`。**
**接口清单:**
| 方法 | 路径 | 最低权限 | 说明 |
|------|------|----------|------|
| POST | `/api/tasks` | ADMIN | 为指定 source 创建 EXTRACTION 任务 |
| GET | `/api/tasks/pool` | ANNOTATOR | 查看可领取任务列表(按角色过滤,分页)。**角色过滤规则**ANNOTATOR 返回 `phase=EXTRACTION AND status=UNCLAIMED`REVIEWER 返回 `phase=EXTRACTION AND status=UNCLAIMED` `phase=QA_GENERATION AND status=UNCLAIMED`REVIEWER 含 ANNOTATOR 继承权限可领取两类任务ADMIN 返回所有 phase、所有 status 的任务(走 `GET /api/tasks`)。 |
| GET | `/api/tasks/pending-review` | REVIEWER | **审批收件箱**:返回 `status=SUBMITTED` 的任务列表分页。REVIEWER 看本公司全部待审批任务ADMIN 同等权限。ANNOTATOR 无权访问此接口403。 |
| POST | `/api/tasks/{id}/claim` | ANNOTATOR | 领取任务(争抢式) |
| POST | `/api/tasks/{id}/unclaim` | ANNOTATOR | 放弃任务,退回任务池 |
| POST | `/api/tasks/{id}/reclaim` | ANNOTATOR | **重拾被驳回的任务**:仅当 `task.status=REJECTED AND task.claimed_by=currentUserId` 时合法;将 status 置为 IN_PROGRESS状态机 REJECTED → IN_PROGRESS写入任务历史。 |
| GET | `/api/tasks/mine` | ANNOTATOR | 查询我领取的任务列表(分页),**包含 REJECTED 状态**(让标注员发现被驳回的任务)。 |
| GET | `/api/tasks/{id}` | ANNOTATOR | 查看任务详情 |
| GET | `/api/tasks` | ADMIN | 查询全部任务(支持过滤,分页) |
| PUT | `/api/tasks/{id}/reassign` | ADMIN | 强制转移任务归属(更新 claimed_bytask status 保持 IN_PROGRESS向 annotation_task_history 写入 from=IN_PROGRESS / to=IN_PROGRESS + note=转移原因) |
---
### 4.4 标注工作台模块EXTRACTION 阶段)
**ExtractionService 核心逻辑:**
```java
// AI 辅助预标注(标注员领取任务后调用)
public AnnotationResult aiPreAnnotate(Long taskId) {
AnnotationTask task = taskMapper.selectById(taskId);
SourceData source = sourceDataMapper.selectById(task.getSourceId());
Object aiResult = "IMAGE".equals(source.getDataType()) || task.getVideoUnitType() != null
? aiServiceClient.extractImage(resolveImagePath(task), task.getAiModel())
: aiServiceClient.extractText(source.getFilePath(), task.getAiModel());
AnnotationResult result = buildAnnotationResult(task, aiResult);
annotationResultMapper.insertOrReplace(result);
return result;
}
// 更新提取结果整体覆盖PUT 语义)
public void updateResult(Long taskId, String resultJsonStr) {
// 验证 JSON 格式合法性
validateResultJson(resultJsonStr);
// 整体替换 result_json禁止局部 PATCH 或逐条追加
annotationResultMapper.updateResultJson(taskId, resultJsonStr, CompanyContext.get());
}
// 审批通过——两阶段设计:
// 阶段一(同步 @Transactional完成审批核心动作步骤 1-3立即返回
// 阶段二异步事件AI 调用 + QA 任务创建(步骤 4-7由 ExtractionApprovedEvent 驱动
// ⚠️ 禁止将 AI HTTP 调用放入 @Transactional 内:会长期占用 DB 连接,且 AI 失败会回滚已完成的审批动作
// ⚠️ 自审防护提交人claimed_by不能同时作为审批人
@Transactional
@OperationLog(type = "EXTRACTION_APPROVE")
public void approve(Long taskId) {
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
if (task.getClaimedBy().equals(getCurrentUserId()))
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务");
AnnotationResult result = annotationResultMapper.selectByTaskId(taskId);
// 1. annotation_result.is_final = true
result.setIsFinal(true);
annotationResultMapper.updateById(result);
// 2. annotation_task.status → APPROVED
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.APPROVED, TaskStatus.TRANSITIONS);
task.setStatus("APPROVED");
task.setCompletedAt(LocalDateTime.now());
taskMapper.updateById(task);
// 3. 写入任务历史
insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);
// ---- 事务边界 ----
// 步骤 4-7 通过 Spring ApplicationEvent 在事务提交后异步执行,
// 避免 AI HTTP 调用阻塞事务(参见 ExtractionApprovedEventListener
applicationEventPublisher.publishEvent(new ExtractionApprovedEvent(taskId, task.getSourceId()));
}
// ExtractionApprovedEventListener@TransactionalEventListener(phase = AFTER_COMMIT)@Async
// 4. 调用 AI 生成候选问答对
// String promptKey = "IMAGE".equals(getSourceType(task)) ? "prompt_qa_gen_image" : "prompt_qa_gen_text";
// QaGenResponse qaResponse = generateQa(task, result, promptTemplate);
//
// 5. 将候选问答对写入 training_datasetPENDING_REVIEW
//
// 6. 创建 QA_GENERATION 阶段任务UNCLAIMED
//
// 7. source_data.status → QA_REVIEW
```
**ExtractionService.reject() 核心逻辑:**
```java
// 驳回——task 回退至 REJECTED由标注员重新 claim 后进入 IN_PROGRESS
@Transactional
@OperationLog(type = "EXTRACTION_REJECT")
public void reject(Long taskId, String reason) {
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
if (task.getClaimedBy().equals(getCurrentUserId()))
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务");
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS);
task.setStatus("REJECTED");
task.setRejectReason(reason);
taskMapper.updateById(task);
insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason);
// ⚠️ source_data 状态保持 EXTRACTING不回退等标注员重新 claim 后继续修改
}
```
> **source_data EXTRACTING 状态说明**:任务从任务池创建(`ADMIN POST /api/tasks`)时,后端同步将 `source_data.status → EXTRACTING`。EXTRACTION 审批驳回后 source_data 保持 EXTRACTING 不变,标注员重新领取修改后重提,审批通过后再推进至 QA_REVIEW。
**接口清单:**
| 方法 | 路径 | 最低权限 | 说明 |
|------|------|----------|------|
| GET | `/api/extraction/{taskId}` | ANNOTATOR | 获取当前提取结果(含 AI 预标注) |
| PUT | `/api/extraction/{taskId}` | ANNOTATOR | 更新提取结果(整体 JSONB 覆盖) |
| POST | `/api/extraction/{taskId}/submit` | ANNOTATOR | 提交提取结果,进入审批队列 |
| POST | `/api/extraction/{taskId}/approve` | REVIEWER | 审批通过(自审防护:不能审批自己提交的任务),自动触发 QA 任务创建 |
| POST | `/api/extraction/{taskId}/reject` | REVIEWER | 驳回自审防护同上附驳回原因task → REJECTED标注员需重新 claim |
---
### 4.5 问答生成模块QA_GENERATION 阶段)
`QaService` 的整体覆盖逻辑与 `ExtractionService` 一致PUT 语义,禁止局部 PATCH
**QaService.reject() 核心逻辑:**
```java
@Transactional
@OperationLog(type = "QA_REJECT")
public void reject(Long taskId, String reason) {
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
if (task.getClaimedBy().equals(getCurrentUserId()))
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务");
// training_dataset.status → REJECTED标注员修改后重提状态回 PENDING_REVIEW
trainingDatasetMapper.rejectByTaskId(taskId, reason, CompanyContext.get());
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS);
task.setStatus("REJECTED");
task.setRejectReason(reason);
taskMapper.updateById(task);
insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason);
// source_data 保持 QA_REVIEW等标注员重新领取修改后重提
}
```
**approve 级联动作(同一事务):**
```java
@Transactional
@OperationLog(type = "QA_APPROVE")
public void approve(Long taskId) {
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
if (task.getClaimedBy().equals(getCurrentUserId()))
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务");
// 1. training_dataset.status → APPROVED
trainingDatasetMapper.approveByTaskId(taskId, getCurrentUserId(), CompanyContext.get());
// 2. annotation_task.status → APPROVED
task.setStatus("APPROVED");
task.setCompletedAt(LocalDateTime.now());
taskMapper.updateById(task);
// 3. source_data.status → APPROVED整条流水线完成
sourceDataMapper.updateStatus(task.getSourceId(), "APPROVED", CompanyContext.get());
// 4. 写入任务历史
insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);
}
```
**接口清单:**
| 方法 | 路径 | 最低权限 | 说明 |
|------|------|----------|------|
| GET | `/api/qa/{taskId}` | ANNOTATOR | 获取候选问答对列表 |
| PUT | `/api/qa/{taskId}` | ANNOTATOR | 修改问答对(整体覆盖) |
| POST | `/api/qa/{taskId}/submit` | ANNOTATOR | 提交问答对,进入审批队列 |
| POST | `/api/qa/{taskId}/approve` | REVIEWER | 审批通过自审防护training_dataset → APPROVEDsource_data → APPROVED |
| POST | `/api/qa/{taskId}/reject` | REVIEWER | 驳回自审防护training_dataset → REJECTED标注员需重新 claim 修改后重提(重提后 training_dataset → PENDING_REVIEW |
---
### 4.6 训练数据导出模块
**ExportService.createBatch() 核心逻辑:**
```java
@Transactional
@OperationLog(type = "EXPORT_CREATE")
public ExportBatch createBatch(List<Long> sampleIds) {
Long companyId = CompanyContext.get();
// 1. 校验样本均为 APPROVED 状态
List<TrainingDataset> samples = trainingDatasetMapper
.selectByIdsAndStatus(sampleIds, "APPROVED", companyId);
if (samples.size() != sampleIds.size())
throw new BusinessException("INVALID_SAMPLES", "部分样本不处于 APPROVED 状态");
// 2. 生成 JSONL 内容(每行一个 glm_format_json
String batchUuid = UUID.randomUUID().toString();
String jsonl = samples.stream()
.map(s -> s.getGlmFormatJson().toString())
.collect(Collectors.joining("\n"));
// 3. 上传至 RustFSfinetune-export/export/{batchUuid}.jsonl
String path = "export/" + batchUuid + ".jsonl";
byte[] bytes = jsonl.getBytes(StandardCharsets.UTF_8);
rustFsClient.upload("finetune-export", path, new ByteArrayInputStream(bytes), bytes.length);
// 4. 更新样本 export_batch_id 与 exported_at
trainingDatasetMapper.batchUpdateExportInfo(sampleIds, batchUuid, LocalDateTime.now());
// 5. 写入 export_batch 记录
ExportBatch batch = ExportBatch.builder()
.companyId(companyId).batchUuid(batchUuid).datasetFilePath(path)
.sampleCount(samples.size()).finetuneStatus("NOT_STARTED")
.createdBy(getCurrentUserId()).build();
exportBatchMapper.insert(batch);
return batch;
}
```
**FinetuneService.trigger()** 调用 `aiServiceClient.startFinetune(...)` 获取 `glmJobId`,更新 `export_batch.glm_job_id``finetune_status = RUNNING`
**接口清单(全部需要 ADMIN 权限):**
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | `/api/training/samples` | 分页查询已审批通过、待导出的样本 |
| POST | `/api/export/batch` | 创建导出批次,合并 JSONL 并上传 RustFS |
| POST | `/api/export/{batchId}/finetune` | 向 GLM 工厂提交微调任务 |
| GET | `/api/export/{batchId}/status` | 查询微调任务状态 |
| GET | `/api/export/list` | 分页查询所有导出批次 |
---
### 4.7 系统配置模块
**SysConfigService.get(configKey)** 先按 `(companyId, configKey)` 查,未命中则按 `(NULL, configKey)` 查全局默认值,公司级配置优先覆盖全局值。
```java
public String get(String configKey) {
Long companyId = CompanyContext.get();
// 先查公司级配置
SysConfig cfg = configMapper.selectByCompanyAndKey(companyId, configKey);
if (cfg == null) {
// 回退到全局默认company_id IS NULL
cfg = configMapper.selectByCompanyAndKey(null, configKey);
}
return cfg != null ? cfg.getConfigValue() : null;
}
```
**接口清单(全部需要 ADMIN 权限):**
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | `/api/config` | 获取所有配置项(公司级 + 全局默认) |
| PUT | `/api/config/{key}` | 更新单项配置(含 Prompt 模板) |
---
### 4.8 视频处理模块
**接口清单(全部需要 ADMIN 权限):**
| 方法 | 路径 | 说明 |
|------|------|------|
| POST | `/api/video/{sourceId}/process-frames` | 触发帧模式预处理(创建 FRAME_EXTRACT 任务source_data → PREPROCESSING |
| POST | `/api/video/{sourceId}/process-to-text` | 触发片段模式预处理(创建 VIDEO_TO_TEXT 任务source_data → PREPROCESSING |
| GET | `/api/video/{sourceId}/jobs` | 查询该视频的所有异步处理任务列表 |
| PUT | `/api/video/jobs/{jobId}/reset` | 将 FAILED 状态的任务手动重置为 PENDING仅 ADMIN 可操作) |
| POST | `/api/video/jobs/{jobId}/callback` | AI 服务回调接口(内网调用,不经过 Shiro Token 验证,通过 IP 白名单保护) |
**VideoProcessService 核心逻辑:**
```java
@Transactional
@OperationLog(type = "TASK_CREATE")
public VideoProcessJob createJob(Long sourceId, String jobType, JsonNode params) {
Long companyId = CompanyContext.get();
// 1. source_data.status → PREPROCESSING
sourceDataMapper.updateStatus(sourceId, "PREPROCESSING", companyId);
// 2. 创建 video_process_jobPENDING
VideoProcessJob job = VideoProcessJob.builder()
.companyId(companyId).sourceId(sourceId)
.jobType(jobType).params(params).status("PENDING").build();
videoProcessJobMapper.insert(job);
// 3. 异步调用 AI 服务(非阻塞,由 AI 服务自行回调)
triggerAiAsync(job);
return job;
}
// AI 服务回调:幂等处理
@Transactional
public void handleCallback(Long jobId, boolean success, String outputPath, String errorMsg) {
VideoProcessJob job = videoProcessJobMapper.selectById(jobId);
// 幂等:已是 SUCCESS 状态则静默忽略重复回调,不得重复创建 annotation_task
if ("SUCCESS".equals(job.getStatus())) return;
if (success) {
job.setStatus("SUCCESS");
job.setOutputPath(outputPath);
job.setCompletedAt(LocalDateTime.now());
// source_data.status → PENDING进入后续标注流程
sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
// ⚠️ 两种模式的后续任务创建ADMIN 看到 PENDING 后可手动创建标注任务)
if ("FRAME_EXTRACT".equals(job.getJobType())) {
// 帧模式AI 返回帧列表,每帧对应一个 annotation_taskphase=EXTRACTIONvideo_unit_type=FRAME
// outputPath 指向帧列表 JSON 文件(存 RustFS由 ADMIN 在 POST /api/tasks 时按帧创建任务
// 不在此处自动创建 EXTRACTION 任务——ADMIN 决定如何调度多帧任务
} else if ("VIDEO_TO_TEXT".equals(job.getJobType())) {
// 片段模式:派生 TEXT 类型 source_dataparent_source_id 指向原视频
// 不在此处创建——ADMIN 收到 PENDING 通知后手动通过 POST /api/tasks 为派生 TEXT 资料创建任务
// 派生 source_data 已在 triggerAiAsync() 中预创建status=PENDING
}
} else {
if (job.getRetryCount() >= job.getMaxRetries()) {
// 达最大重试次数,置 FAILEDsource_data → PENDING 保留 error_message需 ADMIN 手动重置
job.setStatus("FAILED");
job.setErrorMessage(errorMsg);
sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
} else {
job.setStatus("RETRYING");
job.setRetryCount(job.getRetryCount() + 1);
job.setErrorMessage(errorMsg);
}
}
videoProcessJobMapper.updateById(job);
}
```
---
## 五、状态机实现规范
[↑ 返回目录](#目录)
所有状态变更**必须**经过 `StateValidator.assertTransition()` 校验,禁止绕过直接调用 Mapper 更新状态字段。
### 5.1 StateValidator
```java
// com/label/common/statemachine/StateValidator.java
public final class StateValidator {
public static <S> void assertTransition(S from, S to, Map<S, Set<S>> transitions) {
Set<S> allowed = transitions.getOrDefault(from, Set.of());
if (!allowed.contains(to))
throw new BusinessException("INVALID_STATE_TRANSITION",
"非法状态转换: " + from + " → " + to);
}
}
```
### 5.2 source_data 状态机
```java
public enum SourceStatus {
PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED, REJECTED;
public static final Map<SourceStatus, Set<SourceStatus>> TRANSITIONS = Map.of(
PENDING, Set.of(EXTRACTING, PREPROCESSING),
PREPROCESSING, Set.of(PENDING),
EXTRACTING, Set.of(QA_REVIEW),
QA_REVIEW, Set.of(APPROVED)
// ⚠️ source_data 不会进入 REJECTED 状态:
// QA 被驳回时 annotation_task → REJECTEDsource_data 保持 QA_REVIEW。
// 整条流水线唯一终态为 APPROVED。
);
}
```
### 5.3 annotation_task 状态机
```java
public enum TaskStatus {
UNCLAIMED, IN_PROGRESS, SUBMITTED, APPROVED, REJECTED;
public static final Map<TaskStatus, Set<TaskStatus>> TRANSITIONS = Map.of(
UNCLAIMED, Set.of(IN_PROGRESS),
IN_PROGRESS, Set.of(SUBMITTED, UNCLAIMED, IN_PROGRESS),
// IN_PROGRESS → IN_PROGRESS 用于 ADMIN 强制转移(持有人变更,状态不变)
SUBMITTED, Set.of(APPROVED, REJECTED),
REJECTED, Set.of(IN_PROGRESS) // 驳回后重拾
);
}
```
### 5.4 training_dataset 状态机
```java
public enum DatasetStatus {
PENDING_REVIEW, APPROVED, REJECTED;
public static final Map<DatasetStatus, Set<DatasetStatus>> TRANSITIONS = Map.of(
PENDING_REVIEW, Set.of(APPROVED, REJECTED),
REJECTED, Set.of(PENDING_REVIEW) // 驳回后可修改重提
);
}
```
### 5.5 video_process_job 状态机
```java
public enum VideoJobStatus {
PENDING, RUNNING, SUCCESS, FAILED, RETRYING;
public static final Map<VideoJobStatus, Set<VideoJobStatus>> TRANSITIONS = Map.of(
PENDING, Set.of(RUNNING),
RUNNING, Set.of(SUCCESS, RETRYING, FAILED),
RETRYING, Set.of(RUNNING, FAILED)
// FAILED → PENDING 由 ADMIN 手动触发接口,不在此处声明自动流转
);
}
```
---
## 六、Docker Compose 配置
[↑ 返回目录](#目录)
```yaml
# docker-compose.yml
version: "3.9"
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: label_db
POSTGRES_USER: label
POSTGRES_PASSWORD: label_password
volumes:
- postgres_data:/var/lib/postgresql/data
- ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U label -d label_db"]
interval: 10s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
command: redis-server --requirepass redis_password
volumes:
- redis_data:/data
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "-a", "redis_password", "ping"]
interval: 10s
timeout: 5s
retries: 5
rustfs:
image: rustfs/rustfs:latest # 替换为生产可用的实际镜像
environment:
RUSTFS_ACCESS_KEY: minioadmin
RUSTFS_SECRET_KEY: minioadmin
volumes:
- rustfs_data:/data
ports:
- "9000:9000" # S3 API 端口
- "9001:9001" # Web 控制台端口
backend:
build:
context: .
dockerfile: Dockerfile
environment:
SPRING_DATASOURCE_URL: jdbc:postgresql://postgres:5432/label_db
SPRING_DATASOURCE_USERNAME: label
SPRING_DATASOURCE_PASSWORD: label_password
SPRING_REDIS_HOST: redis
SPRING_REDIS_PORT: 6379
SPRING_REDIS_PASSWORD: redis_password
RUSTFS_ENDPOINT: http://rustfs:9000
RUSTFS_ACCESS_KEY: minioadmin
RUSTFS_SECRET_KEY: minioadmin
AI_SERVICE_BASE_URL: http://ai-service:8000
ports:
- "8080:8080"
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
rustfs:
condition: service_started
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
interval: 15s
retries: 5
ai-service:
build:
context: ../ai_service # Python FastAPI 服务所在目录
dockerfile: Dockerfile
environment:
RUSTFS_ENDPOINT: http://rustfs:9000
RUSTFS_ACCESS_KEY: minioadmin
RUSTFS_SECRET_KEY: minioadmin
ports:
- "8000:8000"
depends_on:
- rustfs
frontend:
image: nginx:alpine
volumes:
- ../frontend/dist:/usr/share/nginx/html:ro
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
ports:
- "80:80"
depends_on:
backend:
condition: service_healthy
volumes:
postgres_data:
redis_data:
rustfs_data:
```
**Dockerfilebackend**
```dockerfile
FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
COPY target/label-backend-*.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
```
---
## 七、测试策略
[↑ 返回目录](#目录)
### 7.1 基本原则
- 集成测试**必须**使用真实的 PostgreSQL 和 Redis 实例Testcontainers禁止仅 Mock 数据库
- 每个受保护接口组**必须**有至少一个集成测试覆盖 Shiro 认证/鉴权过滤器链(缺少 Token → 401角色不足 → 403
### 7.2 并发任务领取测试(必须)
```java
@Test
void concurrentClaimShouldOnlySucceedOnce() throws InterruptedException {
// 准备:创建一个 UNCLAIMED 任务
Long taskId = createUnclaimedTask();
int threadCount = 10;
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger successCount = new AtomicInteger(0);
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
pool.submit(() -> {
try {
latch.await();
taskClaimService.claim(taskId);
successCount.incrementAndGet();
} catch (BusinessException e) {
// 预期:其余线程抛出"任务已被领取"
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
latch.countDown();
pool.awaitTermination(5, SECONDS);
// 验证:仅 1 个线程领取成功
assertThat(successCount.get()).isEqualTo(1);
AnnotationTask task = taskMapper.selectById(taskId);
assertThat(task.getStatus()).isEqualTo("IN_PROGRESS");
assertThat(task.getClaimedBy()).isNotNull();
}
```
### 7.3 视频回调幂等测试(必须)
```java
@Test
void duplicateSuccessCallbackShouldBeIdempotent() {
Long sourceId = createVideoSource();
VideoProcessJob job = videoProcessService.createJob(sourceId, "FRAME_EXTRACT", params);
// 第一次成功回调
videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);
// 重复成功回调
videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);
// 验证:只创建一个 annotation_task
long taskCount = annotationTaskMapper.countBySourceId(sourceId);
assertThat(taskCount).isEqualTo(1);
}
```
### 7.4 状态机越界拒绝测试
```java
@Test
void illegalStateTransitionShouldThrow() {
// 验证APPROVED → IN_PROGRESS 被拒绝
assertThatThrownBy(() ->
StateValidator.assertTransition(TaskStatus.APPROVED, TaskStatus.IN_PROGRESS, TaskStatus.TRANSITIONS)
).isInstanceOf(BusinessException.class)
.hasMessageContaining("非法状态转换");
}
```
### 7.5 多租户隔离测试
```java
@Test
void companyACannotAccessCompanyBData() {
Long sourceIdB = createSourceForCompany(companyB);
// 以 companyA 身份请求 companyB 的资源
CompanyContext.set(companyA);
assertThatThrownBy(() -> sourceService.findById(sourceIdB))
.isInstanceOf(BusinessException.class);
}
```
---
## 八、宪章合规检查清单
[↑ 返回目录](#目录)
PR 合并前评审人**必须**逐条核对以下清单:
| # | 宪章原则 | 实现位置 | 检查要点 |
|---|----------|----------|----------|
| 1 | 环境约束JDK 17、SB 3、Shiro、MyBatis Plus | `pom.xml` | 版本号符合约束;无 Spring Security 并行引入 |
| 2 | 多租户数据隔离 | `TenantLineInnerInterceptor``CompanyContext` | 所有 Mapper 自动注入 company_idThreadLocal 在 finally 块清理 |
| 3 | BCrypt 密码、UUID Token、滑动过期、禁 JWT | `AuthService``TokenFilter``RedisKeyManager` | 无明文密码存储;每次有效请求重置 TTL无 JWT 库引入 |
| 4 | 分级 RBAC、权限注解、角色变更驱逐缓存 | `UserRealm``@RequiresRoles``UserService` | 无 if-role 临时判断;`updateRole()` 立即删缓存 |
| 5 | 双流水线、级联触发 QA 任务、parent_source_id 溯源 | `ExtractionService.approve()` | 级联动作在同一 @Transactional 内;视频转文本的 parent_source_id 不为空 |
| 6 | 状态机完整性 | `StateValidator`、各 `*Status` 枚举 | 所有状态变更调用 `StateValidator`;无绕过直接写 Mapper 的路径 |
| 7 | 任务争抢双重保障 | `TaskClaimService.claim()` | Redis SET NX + DB WHERE status='UNCLAIMED' 两道并存 |
| 8 | 异步任务幂等、重试上限、FAILED 需手动重置 | `VideoProcessService.handleCallback()` | 重复成功回调静默忽略;`retry_count >= max_retries` 置 FAILED |
| 9 | 只追加审计日志、AOP 切面、审计失败不回滚业务 | `AuditAspect``@OperationLog` | `sys_operation_log` 无 UPDATE/DELETE审计异常仅 error 日志 |
| 10 | RESTful URL、统一响应格式、分页必须 | `Result<T>`、各 Controller | 无动词路径;无裸 POJO 返回;所有列表接口有分页参数 |
| 11 | YAGNI业务在 ServiceController 只处理 HTTP | 包结构 | Controller 无业务判断逻辑;无未使用的抽象层 |
---
## 九、设计评审报告
[↑ 返回目录](#目录)
**评审日期**2026-04-09 | **评审方式**:独立子 Agent 五维评审 + 人工复核 | **初始质量评分**6.5/10 → 修复后:**8.5/10**
### 9.1 发现并已修复的问题
| # | 严重级 | 位置 | 问题描述 | 修复内容 |
|---|--------|------|----------|----------|
| A | CRITICAL | §4.3 任务池接口 | `GET /api/tasks/pool` 的角色过滤逻辑不明确,开发者无法判断各角色看到哪些任务 | 在接口说明中补充ANNOTATOR 看 EXTRACTION-UNCLAIMEDREVIEWER 看两类 UNCLAIMEDADMIN 走全量接口 |
| B | HIGH | §4.4、§4.5 审批接口 | approve/reject 接口未声明"提交人不能自审"约束,存在质量绕过风险 | 在 `approve()``reject()` 代码示例中加入自审防护检查:`if (task.getClaimedBy().equals(getCurrentUserId())) throw` |
| C | HIGH | §4.8 视频处理 | 视频处理模块完全缺失 REST API 接口清单,帧模式/片段模式成功后的后续任务创建分支不清晰 | 补充 5 条接口(含 ADMIN-only 权限),在 handleCallback() 中明确两种模式的处理路径差异 |
| D | MEDIUM | §4.4 | 缺少 `ExtractionService.reject()` 代码示例,驳回后 source_data 状态不明确 | 补充 reject() 完整实现;说明 source_data 保持 EXTRACTING 等待标注员重提 |
| E | MEDIUM | §4.5 | 缺少 `QaService.reject()` 代码示例training_dataset 驳回后状态流转不清晰 | 补充 reject()training_dataset → REJECTED重提后 → PENDING_REVIEW |
| F | LOW | §4.3 | reassign 接口说明过于简洁,未说明 annotation_task_history 写入规则 | 补充说明:保持 IN_PROGRESShistory 写入 from=IN_PROGRESS/to=IN_PROGRESS + note |
### 9.2 未修复的设计选择(需业务澄清)
| # | 问题 | 背景 | 建议 |
|---|------|------|------|
| G | 原需求提到"若任务池中无空闲审批员,由标注员兼任审批" | 宪章要求 REVIEWER ⊃ ANNOTATOR 角色继承,系统无法动态提升 ANNOTATOR 权限 | **建议取消此机制**:统一要求 QA 审批必须由 REVIEWER 或 ADMIN 执行;若无可用 REVIEWER由 ADMIN 代为审批。若需保留,需修改宪章并引入动态权限提升机制(超出当前架构范围) |
| H | `GET /api/tasks/{id}` 的精细访问控制 | ANNOTATOR 是否只能查看自己的任务REJECTED 任务是否对原领取人可见? | 当前设计依赖 `company_id` 隔离,建议 Service 层加 `claimed_by = currentUserId OR role >= REVIEWER` 的访问控制检查 |
### 9.3 修复后宪章合规状态
| 宪章原则 | 修复前 | 修复后 |
|----------|--------|--------|
| 四、分级 RBAC权限注解声明 | ✅ 基本覆盖 | ✅ 补充了自审防护,任务池过滤规则完整 |
| 五、双标注流水线(级联触发规则) | ⚠️ reject 路径缺失 | ✅ reject() 示例补全source_data 回退路径明确 |
| 六、状态机完整性 | ⚠️ QA reject 状态转换不明 | ✅ training_dataset REJECTED → PENDING_REVIEW 完整 |
| 八、异步任务处理(幂等回调) | ⚠️ 两种视频模式处理分支缺失 | ✅ handleCallback 分支注释明确 |
---
### 9.4 审批流程合理性专项评审(第二轮)
**评审日期**2026-04-09 | **评审焦点**:审批工作流是否可被实际执行
#### 9.4.1 发现的问题
| # | 严重级 | 位置 | 问题描述 | 根因 | 建议修复 |
|---|--------|------|----------|------|----------|
| I | CRITICAL | §4.3、§4.4、§4.5 | **REVIEWER 无审批收件箱**`GET /api/tasks/pool` 仅返回 UNCLAIMED 任务REVIEWER 无法发现哪些任务处于 SUBMITTED 状态等待审批。整个审批流程在 API 层面缺失入口。 | 任务池设计仅服务于"领取"场景,未考虑"审批"场景 | 新增接口:`GET /api/tasks/pending-review`(权限 REVIEWER返回 `phase=EXTRACTION OR QA_GENERATION AND status=SUBMITTED` 的任务列表分页ADMIN 可查全部 |
| J | HIGH | §4.4 `ExtractionService.approve()` | **@Transactional 内同步调用 AI 服务**`generateQa()` 在事务中发起 HTTP 请求,可能耗时 530 秒。将导致:① DB 连接被长期占用(连接池耗尽风险);② AI 调用失败会回滚已完成的审批动作(标注员和审批员的工作丢失);③ 审批接口响应时间不可预期。 | 审批与 QA 生成未解耦 | **两阶段拆分**`approve()` 事务只完成步骤 13is_final、APPROVED、history然后发布 `ExtractionApprovedEvent``QaGenerationListener` 异步消费该事件,完成步骤 47AI 调用、生成 QA、创建 QA 任务、更新 source_data。若 AI 调用失败审批结果不回滚QA 任务可重试 |
| K | HIGH | §4.3、§4.4、§4.5 | **REJECTED 任务重拾路径未实现**:驳回后 task.status = REJECTED标注员需"重新 claim",但:① `GET /api/tasks/pool` 仅展示 UNCLAIMED 任务REJECTED 任务不可见;② 没有 `POST /api/tasks/{id}/reclaim` 接口;③ 状态机定义了 `REJECTED → IN_PROGRESS` 合法转换,但无 API 触发它。被驳回的标注员无任何操作路径。 | 驳回后续路径仅在状态机中声明,未转化为接口 | 方案一:修改 `GET /api/tasks/mine` 包含 REJECTED 状态,新增 `POST /api/tasks/{id}/reclaim` 接口(权限 ANNOTATOR状态机校验 REJECTED → IN_PROGRESS设置 status=IN_PROGRESS。方案二驳回时直接将 task 置为 UNCLAIMED退回公共任务池原领取人或他人均可重新领取。**推荐方案一**(保留原领取人优先权,避免任务被他人抢占) |
| L | MEDIUM | §4.5 `QaService.approve()` | **重复变量声明(编译错误)**:方法内 `task` 变量被声明两次(第 1057 行与第 1065 行Java 不允许同作用域内重复声明,此代码无法编译通过。 | 复制粘贴失误 | 删除第 1065 行的 `AnnotationTask task =`,直接使用第 1057 行已声明的 `task` 变量 |
| M | MEDIUM | §5.2 `SourceStatus` 状态机 | **source_data.REJECTED 状态不可达**:状态机声明 `QA_REVIEW → REJECTED` 合法,但 `QaService.reject()` 明确注释"source_data 保持 QA_REVIEW"从不触发此转换。REJECTED 是死状态,状态机与实现不一致,可能误导实现者。 | 状态机定义未与实现对齐 | 从 `SourceStatus.TRANSITIONS` 中移除 `QA_REVIEW → REJECTED` 转换并在注释中说明source_data 不会进入 REJECTEDQA 被驳回时 source_data 保持 QA_REVIEW仅 annotation_task 进入 REJECTED |
#### 9.4.2 审批工作流完整路径(修复后期望状态)
```
EXTRACTION 阶段:
ANNOTATOR 领取(/api/tasks/{id}/claim
→ 标注工作台提交(/api/extraction/{taskId}/submit
→ REVIEWER 从审批收件箱发现(/api/tasks/pending-review
→ 审批通过(/api/extraction/{taskId}/approve
→ [异步] AI 生成 QA → 创建 QA_GENERATION 任务 → source_data → QA_REVIEW
→ 审批驳回(/api/extraction/{taskId}/reject
→ 标注员从"我的任务"看到 REJECTED 任务
→ 重拾(/api/tasks/{id}/reclaim→ IN_PROGRESS → 修改后重提
QA_GENERATION 阶段:
ANNOTATOR/REVIEWER 领取(/api/tasks/{id}/claim
→ 问答对修改提交(/api/qa/{taskId}/submit
→ REVIEWER 从审批收件箱发现(/api/tasks/pending-review
→ 审批通过(/api/qa/{taskId}/approve
→ training_dataset → APPROVED → source_data → APPROVED流水线终态
→ 审批驳回(/api/qa/{taskId}/reject
→ training_dataset → REJECTEDsource_data 保持 QA_REVIEW
→ 标注员重拾(/api/tasks/{id}/reclaim→ 修改后重提
```
#### 9.4.3 需立即修复的条目
| 问题 | 修复位置 | 修复类型 |
|------|----------|----------|
| I无审批收件箱 | §4.3 接口清单 | 新增接口 `GET /api/tasks/pending-review` |
| J@Transactional 内 AI 调用) | §4.4 ExtractionService.approve() | 拆分为两阶段,步骤 4-7 异步化 |
| KREJECTED 重拾路径缺失) | §4.3 接口清单 + §4.4/4.5 说明 | 新增接口 `POST /api/tasks/{id}/reclaim``GET /api/tasks/mine` 包含 REJECTED |
| L重复变量声明 | §4.5 QaService.approve() 代码 | 删除第二个 `AnnotationTask task =` 声明 |
| Msource_data 死状态) | §5.2 SourceStatus 状态机 | 移除 `QA_REVIEW → REJECTED` 转换 |
---
### 9.5 数据库表设计完善性专项评审(第三轮)
**评审日期**2026-04-09 | **评审焦点**DDL 完整性、约束完备性、索引覆盖
#### 9.5.1 发现的问题(共 10 项)
| # | 严重级 | 表 | 问题描述 | 修复方案 |
|---|--------|-----|----------|----------|
| N | CRITICAL | `sys_config` | **全局唯一约束失效**`UNIQUE (company_id, config_key)` 在 PostgreSQL 中NULL 不与任何值相等(含另一个 NULL导致 `company_id=NULL` 的行可重复插入相同 config_key。两条 `(NULL, 'model_default')` 均可写入,破坏全局配置覆盖语义。 | 将单一 UNIQUE 约束改为两个局部唯一索引(见下方 DDL 修复) |
| O | CRITICAL | `annotation_result` | **缺少 UNIQUE(task_id)**`selectByTaskId()` 假设每个任务只有一条结果1:1但无唯一约束保证。高并发重复调用 `aiPreAnnotate()` 可能插入多行,导致查询时返回多行引发异常。 | 添加 `UNIQUE (task_id)` 约束 |
| P | CRITICAL | `training_dataset` | **`export_batch_id` 无引用完整性**:字段类型为 VARCHAR(50) 引用 `export_batch.batch_uuid`,但无 FK 约束。可写入不存在的批次 UUID导出查询时出现幽灵引用。 | 改为 `export_batch_id BIGINT REFERENCES export_batch(id)`;对应代码改为存储 `export_batch.id` |
| Q | HIGH | 全部业务表 | **无 CHECK 约束保护枚举字段**`sys_user.role``annotation_task.status``annotation_task.phase``source_data.status``training_dataset.status` 等均为裸 VARCHAR代码 bug 或 DBA 手动 SQL 可写入 `'APPROVEEED'` 等非法值,状态机仅在应用层有效。 | 在每张表上为枚举字段添加 CHECK 约束(见下方 DDL 修复) |
| R | HIGH | `annotation_task_history` | **缺少 `operator_name` 快照**:当前只有 `operator_id`FK`operator_role` 快照。若用户被禁用或删除,历史记录无法追溯操作人姓名——与 `sys_operation_log` 的快照设计不一致。 | 新增 `operator_name VARCHAR(50) NOT NULL` 字段,写入时从当前 UserSession 读取用户名 |
| S | MEDIUM | `annotation_task` | **缺少 `(company_id, source_id)` 索引**:审批通过后查询"该 source 的全部任务"(用于 source 详情页),以及 `ExtractionService.approve()` 获取 source 类型,都需要按 source_id 查询。现有索引不覆盖此路径。 | 新增 `CREATE INDEX idx_annotation_task_source ON annotation_task(company_id, source_id)` |
| T | MEDIUM | `training_dataset` | **缺少 `task_id` 索引**`approveByTaskId``rejectByTaskId``selectByTaskId` 均按 task_id 查询,但表上只有 `(company_id, status)``export_batch_id` 索引,无 task_id 索引。每次审批都需全表扫描(或依赖 FK 扫描)。 | 新增 `CREATE INDEX idx_training_dataset_task ON training_dataset(task_id)` |
| U | LOW | `sys_user` | **缺少 `created_by` 字段**ADMIN 创建用户时无字段记录操作者,需查 `sys_operation_log` 才能溯源。表自身审计不完整。 | 新增 `created_by BIGINT REFERENCES sys_user(id)`(可为 NULL系统初始化时无上级|
| V | LOW | `source_data` | **缺少 `mime_type` 字段**:文件服务需要设置 Content-Type 响应头,当前只能从 `file_name` 后缀推断,对无扩展名或错误扩展名文件不可靠。 | 新增 `mime_type VARCHAR(100)` 字段,上传时由后端探测或由客户端提供 |
| W | LOW | 全部有 `updated_at` 的表 | **`updated_at` 无自动更新触发器**:所有表依赖应用层手动 `updated_at = NOW()`,遗漏时字段永远停在创建时间,无法用于数据变更监控。 | 为每张有 `updated_at` 的表创建 `UPDATE` 触发器(见下方 DDL 修复) |
#### 9.5.2 DDL 修复补丁(已直接修订 §2
以下修复直接追加到 §2 DDL 末尾,作为补充 DDL
```sql
-- =============================================
-- DDL 修复补丁(依据 §9.5 评审结论)
-- =============================================
-- N. sys_config修复全局配置键唯一约束PostgreSQL NULL != NULL 问题)
-- 删除原有约束,改为两个局部唯一索引
ALTER TABLE sys_config DROP CONSTRAINT IF EXISTS uq_config_company_key;
-- 全局配置company_id IS NULL 时config_key 唯一
CREATE UNIQUE INDEX uq_config_global_key
ON sys_config(config_key) WHERE company_id IS NULL;
-- 公司配置:同一公司内 config_key 唯一
CREATE UNIQUE INDEX uq_config_company_key
ON sys_config(company_id, config_key) WHERE company_id IS NOT NULL;
-- O. annotation_result每个 task 只能有一条结果
ALTER TABLE annotation_result ADD CONSTRAINT uq_annotation_result_task UNIQUE (task_id);
-- P. training_datasetexport_batch_id 改为 BIGINT FK
-- (新建时使用,存量系统迁移需额外脚本)
ALTER TABLE training_dataset
ADD COLUMN export_batch_fk BIGINT REFERENCES export_batch(id);
-- 注意export_batch_id VARCHAR(50) 字段保留用于兼容过渡,实现层改为写 export_batch_fk
-- Q. CHECK 约束:关键枚举字段
ALTER TABLE sys_user
ADD CONSTRAINT chk_user_role CHECK (role IN ('UPLOADER','ANNOTATOR','REVIEWER','ADMIN')),
ADD CONSTRAINT chk_user_status CHECK (status IN ('ACTIVE','DISABLED'));
ALTER TABLE source_data
ADD CONSTRAINT chk_source_type CHECK (data_type IN ('TEXT','IMAGE','VIDEO')),
ADD CONSTRAINT chk_source_status CHECK (status IN ('PENDING','PREPROCESSING','EXTRACTING','QA_REVIEW','APPROVED','REJECTED'));
ALTER TABLE annotation_task
ADD CONSTRAINT chk_task_phase CHECK (phase IN ('EXTRACTION','QA_GENERATION')),
ADD CONSTRAINT chk_task_status CHECK (status IN ('UNCLAIMED','IN_PROGRESS','SUBMITTED','APPROVED','REJECTED')),
ADD CONSTRAINT chk_task_type CHECK (task_type IN ('AI_ASSISTED','MANUAL'));
ALTER TABLE training_dataset
ADD CONSTRAINT chk_dataset_type CHECK (sample_type IN ('TEXT','IMAGE','VIDEO_FRAME')),
ADD CONSTRAINT chk_dataset_status CHECK (status IN ('PENDING_REVIEW','APPROVED','REJECTED'));
ALTER TABLE video_process_job
ADD CONSTRAINT chk_job_type CHECK (job_type IN ('FRAME_EXTRACT','VIDEO_TO_TEXT')),
ADD CONSTRAINT chk_job_status CHECK (status IN ('PENDING','RUNNING','SUCCESS','FAILED','RETRYING'));
-- R. annotation_task_history补充 operator_name 快照字段
ALTER TABLE annotation_task_history
ADD COLUMN operator_name VARCHAR(50);
-- 历史存量行 operator_name 置为 '(unknown)',新增行必须填写
UPDATE annotation_task_history SET operator_name = '(unknown)' WHERE operator_name IS NULL;
ALTER TABLE annotation_task_history ALTER COLUMN operator_name SET NOT NULL;
-- S. annotation_task补充 source_id 查询索引
CREATE INDEX idx_annotation_task_source ON annotation_task(company_id, source_id);
-- T. training_dataset补充 task_id 查询索引
CREATE INDEX idx_training_dataset_task ON training_dataset(task_id);
-- U. sys_user补充 created_by
ALTER TABLE sys_user ADD COLUMN created_by BIGINT REFERENCES sys_user(id);
-- V. source_data补充 mime_type
ALTER TABLE source_data ADD COLUMN mime_type VARCHAR(100);
-- W. updated_at 自动更新触发器(以 annotation_task 为例,其余表同理)
CREATE OR REPLACE FUNCTION set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 为所有有 updated_at 的表创建触发器
DO $$
DECLARE
tbl TEXT;
BEGIN
FOREACH tbl IN ARRAY ARRAY[
'sys_company','sys_user','source_data','annotation_task',
'annotation_result','training_dataset','export_batch','sys_config','video_process_job'
] LOOP
EXECUTE format(
'CREATE TRIGGER trg_%I_updated_at
BEFORE UPDATE ON %I
FOR EACH ROW EXECUTE FUNCTION set_updated_at()',
tbl, tbl
);
END LOOP;
END;
$$;
```
#### 9.5.3 需业务确认的事项
| # | 问题 | 背景 | 建议 |
|---|------|------|------|
| X | `training_dataset` 每个 QA 任务对应几条记录? | 一次 QA 任务可能产出多个问答对(一份文档提取了 5 个三元组,就有 5 个 QA 对)。如果是多条,`task_id` 索引应采用普通索引而非唯一索引。 | 确认多对一(一个 task → 多条 training_dataset`task_id` 只建普通索引,`approveByTaskId`/`rejectByTaskId` 批量更新逻辑不变 |
| Y | `export_batch.dataset_file_path` 是否应为 NOT NULL | 当前为 nullable批次先创建再上传文件后更新路径。如果上传 RustFS 和 INSERT export_batch 在同一事务内,可直接 NOT NULL。 | 如能保证原子性,将 `dataset_file_path` 改为 NOT NULL防止空路径记录存在 |