From e38299571876e2cd57f41a64c75aaf0fe62a9566 Mon Sep 17 00:00:00 2001 From: wh Date: Thu, 9 Apr 2026 11:34:31 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E5=AE=A1=E6=89=B9=E6=B5=81=E7=A8=8B?= =?UTF-8?q?=E5=90=88=E7=90=86=E6=80=A7=E4=B8=93=E9=A1=B9=E8=AF=84=E5=AE=A1?= =?UTF-8?q?=EF=BC=88=E7=AC=AC=E4=BA=8C=E8=BD=AE=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 §9.4 审批流程合理性专项评审,5 项问题(I–M) - 新增 GET /api/tasks/pending-review(REVIEWER 审批收件箱) - 新增 POST /api/tasks/{id}/reclaim(REJECTED 任务重拾) - GET /api/tasks/mine 说明补充:包含 REJECTED 状态 - ExtractionService.approve() 重构为两阶段:同步审批 + 异步 AI 调用(发布 ExtractionApprovedEvent) - 修复 QaService.approve() 重复变量声明(编译错误) - 修复 SourceStatus 状态机:移除不可达的 QA_REVIEW → REJECTED 转换 --- .../specs/2026-04-09-label-backend-design.md | 1635 +++++++++++++++++ 1 file changed, 1635 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-09-label-backend-design.md diff --git a/docs/superpowers/specs/2026-04-09-label-backend-design.md b/docs/superpowers/specs/2026-04-09-label-backend-design.md new file mode 100644 index 0000000..0104c9c --- /dev/null +++ b/docs/superpowers/specs/2026-04-09-label-backend-design.md @@ -0,0 +1,1635 @@ +# label_backend 开发实施指南 + +**版本:1.0.0 | 日期:2026-04-09 | 依据:backend后台设计.md v2.2 + constitution.md v1.1.0** + +--- + +## 一、项目总览 + +### 1.1 系统定位 + +label_backend 是知识图谱智能标注平台的后端服务,基于 Spring Boot 3 构建,驱动**文本线**和**图片线**两条标注流水线,将文档、图片、视频原始资料处理为 GLM 微调格式的训练数据集。 + +### 1.2 数据流水线 + +``` +文本线: 文档 → 三元组提取(主语/谓语/宾语 + 原文片段 + 字符偏移) + → 问答对生成 → 审批 → 训练样本(GLM 文本格式) + +图片线: 图片 → 四元组提取(主体/关系/客体/修饰词 + bbox + 裁剪图) + → 问答对生成 → 审批 → 训练样本(GLM 图文格式) + +视频预处理(非独立流水线): + 帧模式: 视频 → AI 抽帧 → 每帧作为图片进入图片线 + 片段模式: 视频 → 多模态模型转文本 → 派生 TEXT source_data → 进入文本线 +``` + +### 1.3 技术栈矩阵 + +| 层次 | 技术 | 版本约束 | +|------|------|----------| +| 运行时 | JDK | 17(LTS) | +| 框架 | Spring Boot | ≥ 3.0.x | +| 认证/鉴权 | Apache Shiro | ≥ 1.13.x(兼容 Spring Boot 3) | +| ORM | MyBatis Plus | ≥ 3.5.x | +| 数据库 | PostgreSQL | ≥ 14 | +| 缓存/锁 | Redis | ≥ 6.x | +| 对象存储 | RustFS(S3 兼容接口) | 当前稳定版 | +| AI 服务 | Python FastAPI(HTTP 调用) | JVM 侧仅作 RestClient 调用 | +| 容器化 | Docker Compose | ≥ 2.x | +| 构建工具 | Maven | ≥ 3.8 | + +### 1.4 后端模块清单 + +| 模块 | 职责 | +|------|------| +| 用户与权限模块 | 用户管理、Shiro RBAC、Token 认证 | +| 资料管理模块 | 文件上传至 RustFS、source_data 元数据管理 | +| 任务池模块 | 任务创建、领取(分布式锁 + 乐观锁)、状态流转 | +| 标注工作台模块 | 调用 AI 提取三/四元组、annotation_result JSONB 写入 | +| 问答生成模块 | 调用 AI 生成候选问答对、training_dataset 管理 | +| 训练数据导出模块 | JSONL 批次导出、GLM 微调对接 | +| 系统配置模块 | Prompt 模板、模型参数、全局/公司级配置管理 | +| 视频处理模块 | 异步抽帧 / 转文本任务跟踪、幂等回调处理 | + +### 1.5 包结构 + +``` +com.label +├── LabelBackendApplication.java +├── common/ +│ ├── result/ # Result、ResultCode、PageResult +│ ├── exception/ # BusinessException、GlobalExceptionHandler +│ ├── context/ # CompanyContext(ThreadLocal) +│ ├── shiro/ # TokenFilter、UserRealm、ShiroConfig +│ ├── redis/ # RedisKeyManager、RedisService +│ ├── aop/ # AuditAspect、@OperationLog 注解 +│ ├── storage/ # RustFsClient(S3 兼容封装) +│ ├── ai/ # AiServiceClient(RestClient 封装 8 个端点) +│ └── statemachine/ # StateValidator、各状态枚举 +├── module/ +│ ├── user/ +│ │ ├── controller/ # AuthController、UserController +│ │ ├── service/ # AuthService、UserService +│ │ ├── mapper/ # SysUserMapper、SysCompanyMapper +│ │ ├── entity/ # SysUser、SysCompany +│ │ └── dto/ # LoginRequest、UserDTO、UserCreateRequest +│ ├── source/ +│ │ ├── controller/ # SourceController +│ │ ├── service/ # SourceService +│ │ ├── mapper/ # SourceDataMapper +│ │ ├── entity/ # SourceData +│ │ └── dto/ # SourceUploadResponse、SourceListQuery +│ ├── task/ +│ │ ├── controller/ # TaskController +│ │ ├── service/ # TaskService、TaskClaimService +│ │ ├── mapper/ # AnnotationTaskMapper、TaskHistoryMapper +│ │ ├── entity/ # AnnotationTask、AnnotationTaskHistory +│ │ └── dto/ # TaskCreateRequest、TaskPoolQuery、TaskDetailDTO +│ ├── annotation/ +│ │ ├── controller/ # ExtractionController、QaController +│ │ ├── service/ # ExtractionService、QaService +│ │ ├── mapper/ # AnnotationResultMapper、TrainingDatasetMapper +│ │ ├── entity/ # AnnotationResult、TrainingDataset +│ │ └── dto/ # ExtractionResultDTO、QaItemDTO、RejectRequest +│ ├── export/ +│ │ ├── controller/ # ExportController +│ │ ├── service/ # ExportService、FinetuneService +│ │ ├── mapper/ # ExportBatchMapper +│ │ ├── entity/ # ExportBatch +│ │ └── dto/ # ExportBatchRequest、FinetuneRequest +│ ├── config/ +│ │ ├── controller/ # SysConfigController +│ │ ├── service/ # SysConfigService +│ │ ├── mapper/ # SysConfigMapper +│ │ ├── entity/ # SysConfig +│ │ └── dto/ # ConfigUpdateRequest +│ └── video/ +│ ├── controller/ # VideoController +│ ├── service/ # VideoProcessService +│ ├── mapper/ # VideoProcessJobMapper +│ ├── entity/ # VideoProcessJob +│ └── dto/ # VideoProcessRequest、VideoCallbackRequest +``` + +--- + +## 二、数据库 DDL + +> 执行顺序:sys_company → sys_user → source_data → annotation_task → annotation_result → training_dataset → export_batch → sys_config → sys_operation_log → annotation_task_history → video_process_job + +```sql +-- ============================================= +-- 1. sys_company — 公司表(多租户根节点) +-- ============================================= +CREATE TABLE sys_company ( + id BIGSERIAL PRIMARY KEY, + company_name VARCHAR(100) NOT NULL UNIQUE, + company_code VARCHAR(50) NOT NULL UNIQUE, + status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE', -- ACTIVE / DISABLED + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +-- ============================================= +-- 2. sys_user — 用户表 +-- ============================================= +CREATE TABLE sys_user ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + username VARCHAR(50) NOT NULL, + password_hash VARCHAR(255) NOT NULL, -- BCrypt,强度因子 >= 10 + real_name VARCHAR(50), + role VARCHAR(20) NOT NULL, -- UPLOADER / ANNOTATOR / REVIEWER / ADMIN + status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE', + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + CONSTRAINT uq_company_username UNIQUE (company_id, username) +); +CREATE INDEX idx_sys_user_company ON sys_user(company_id); + +-- ============================================= +-- 3. source_data — 原始资料元数据表 +-- ============================================= +CREATE TABLE source_data ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + uploader_id BIGINT REFERENCES sys_user(id), + data_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO + file_path VARCHAR(500) NOT NULL, + file_name VARCHAR(255) NOT NULL, + file_size BIGINT, + bucket_name VARCHAR(100) NOT NULL, + parent_source_id BIGINT REFERENCES source_data(id), -- 视频转文本时指向原视频 + status VARCHAR(20) NOT NULL DEFAULT 'PENDING', + -- PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED / REJECTED + reject_reason TEXT, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_source_data_company ON source_data(company_id); +CREATE INDEX idx_source_data_status ON source_data(company_id, status); +CREATE INDEX idx_source_data_parent ON source_data(parent_source_id); + +-- ============================================= +-- 4. annotation_task — 标注任务表 +-- ============================================= +CREATE TABLE annotation_task ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + source_id BIGINT NOT NULL REFERENCES source_data(id), + phase VARCHAR(20) NOT NULL, -- EXTRACTION / QA_GENERATION + task_type VARCHAR(20) NOT NULL, -- AI_ASSISTED / MANUAL + ai_model VARCHAR(50), + video_unit_type VARCHAR(20), -- FRAME(仅视频帧模式填写,其余为空) + video_unit_info JSONB, -- {"frame_index":150,"time_sec":5.0,"frame_path":"..."} + claimed_by BIGINT REFERENCES sys_user(id), + claimed_at TIMESTAMP, + status VARCHAR(20) NOT NULL DEFAULT 'UNCLAIMED', + -- UNCLAIMED / IN_PROGRESS / SUBMITTED / APPROVED / REJECTED + reject_reason TEXT, + submitted_at TIMESTAMP, + completed_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_annotation_task_company ON annotation_task(company_id); +CREATE INDEX idx_annotation_task_pool ON annotation_task(company_id, phase, status); +CREATE INDEX idx_annotation_task_claimed ON annotation_task(claimed_by, status); + +-- ============================================= +-- 5. annotation_result — 标注结果表(EXTRACTION 阶段,JSONB 聚合) +-- ============================================= +CREATE TABLE annotation_result ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + task_id BIGINT NOT NULL REFERENCES annotation_task(id), + result_json JSONB NOT NULL, -- 整体覆盖,禁止局部 PATCH + is_final BOOLEAN NOT NULL DEFAULT FALSE, + submitted_by BIGINT REFERENCES sys_user(id), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_annotation_result_task ON annotation_result(task_id); +CREATE INDEX idx_annotation_result_final ON annotation_result(company_id, is_final); + +-- ============================================= +-- 6. training_dataset — 训练样本表(问答对终态) +-- ============================================= +CREATE TABLE training_dataset ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + task_id BIGINT NOT NULL REFERENCES annotation_task(id), + source_id BIGINT NOT NULL REFERENCES source_data(id), + extraction_result_id BIGINT NOT NULL REFERENCES annotation_result(id), + sample_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO_FRAME + glm_format_json JSONB NOT NULL, -- GLM 微调格式,JSONB 支持字段级查询 + export_batch_id VARCHAR(50), -- 未导出时为空 + status VARCHAR(20) NOT NULL DEFAULT 'PENDING_REVIEW', + -- PENDING_REVIEW / APPROVED / REJECTED + reject_reason TEXT, + reviewed_by BIGINT REFERENCES sys_user(id), + exported_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_training_dataset_company ON training_dataset(company_id); +CREATE INDEX idx_training_dataset_status ON training_dataset(company_id, status); +CREATE INDEX idx_training_dataset_batch ON training_dataset(export_batch_id); + +-- ============================================= +-- 7. export_batch — 导出批次表 +-- ============================================= +CREATE TABLE export_batch ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + batch_uuid VARCHAR(50) NOT NULL UNIQUE, + dataset_file_path VARCHAR(500), + sample_count INT NOT NULL DEFAULT 0, + glm_job_id VARCHAR(100), -- 调用微调接口后填写,初始为 NULL + finetune_status VARCHAR(20) NOT NULL DEFAULT 'NOT_STARTED', + -- NOT_STARTED / RUNNING / SUCCESS / FAILED + error_message TEXT, + created_by BIGINT REFERENCES sys_user(id), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_export_batch_company ON export_batch(company_id); + +-- ============================================= +-- 8. sys_config — 系统配置表(支持全局默认 + 公司级覆盖) +-- ============================================= +CREATE TABLE sys_config ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT REFERENCES sys_company(id), -- NULL = 全局默认配置 + config_key VARCHAR(100) NOT NULL, + config_value TEXT NOT NULL, + description TEXT, + updated_by BIGINT REFERENCES sys_user(id), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + CONSTRAINT uq_config_company_key UNIQUE (company_id, config_key) +); + +-- 预置全局配置项 +INSERT INTO sys_config (company_id, config_key, config_value, description) VALUES +(NULL, 'prompt_extract_text', '...', '文本三元组提取 Prompt 模板'), +(NULL, 'prompt_extract_image', '...', '图像四元组提取 Prompt 模板'), +(NULL, 'prompt_video_to_text', '...', '视频转文本 Prompt 模板'), +(NULL, 'prompt_qa_gen_text', '...', '文本问答对生成 Prompt 模板'), +(NULL, 'prompt_qa_gen_image', '...', '图像问答对生成 Prompt 模板'), +(NULL, 'model_default', 'glm-4', '默认 AI 辅助模型'), +(NULL, 'video_frame_interval', '30', '视频帧模式抽帧间隔(帧数)'), +(NULL, 'token_ttl_seconds', '7200', 'Token 有效期(秒)'), +(NULL, 'glm_api_base_url', 'http://ai-service:8000', 'GLM API 地址'); + +-- ============================================= +-- 9. sys_operation_log — 操作审计日志(只追加,按月分区) +-- ============================================= +CREATE TABLE sys_operation_log ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT REFERENCES sys_company(id), + operator_id BIGINT REFERENCES sys_user(id), -- 登录失败时可为 NULL + operator_name VARCHAR(50) NOT NULL, -- 操作时用户名快照 + operation_type VARCHAR(50) NOT NULL, + target_type VARCHAR(30), + target_id BIGINT, + detail JSONB, + ip_address VARCHAR(50), + result VARCHAR(10) NOT NULL, -- SUCCESS / FAIL + error_message TEXT, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +) PARTITION BY RANGE (created_at); + +-- 初始分区(建议使用 pg_partman 自动维护) +CREATE TABLE sys_operation_log_2026_04 + PARTITION OF sys_operation_log + FOR VALUES FROM ('2026-04-01') TO ('2026-05-01'); + +CREATE TABLE sys_operation_log_2026_05 + PARTITION OF sys_operation_log + FOR VALUES FROM ('2026-05-01') TO ('2026-06-01'); + +-- ============================================= +-- 10. annotation_task_history — 任务流转历史(只追加) +-- ============================================= +CREATE TABLE annotation_task_history ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + task_id BIGINT NOT NULL REFERENCES annotation_task(id), + from_status VARCHAR(20), -- 任务初建时为 NULL + to_status VARCHAR(20) NOT NULL, + operator_id BIGINT NOT NULL REFERENCES sys_user(id), + operator_role VARCHAR(20) NOT NULL, -- 操作时角色快照 + note TEXT, -- 驳回原因、强制转移说明等 + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_task_history_task ON annotation_task_history(task_id); + +-- ============================================= +-- 11. video_process_job — 视频异步处理任务表 +-- ============================================= +CREATE TABLE video_process_job ( + id BIGSERIAL PRIMARY KEY, + company_id BIGINT NOT NULL REFERENCES sys_company(id), + source_id BIGINT NOT NULL REFERENCES source_data(id), + job_type VARCHAR(20) NOT NULL, -- FRAME_EXTRACT / VIDEO_TO_TEXT + status VARCHAR(20) NOT NULL DEFAULT 'PENDING', + -- PENDING / RUNNING / SUCCESS / FAILED / RETRYING + params JSONB NOT NULL, + total_units INT, + processed_units INT NOT NULL DEFAULT 0, + output_path VARCHAR(500), + retry_count INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 3, + error_message TEXT, + started_at TIMESTAMP, + completed_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX idx_video_process_job_source ON video_process_job(source_id); +CREATE INDEX idx_video_process_job_status ON video_process_job(status); +``` + +--- + +## 三、公共基础设施 + +### 3.1 统一响应封装 + +```java +// com/label/common/result/Result.java +public record Result(String code, T data, String message) { + public static Result ok(T data) { + return new Result<>("SUCCESS", data, null); + } + public static Result ok() { + return new Result<>("SUCCESS", null, null); + } + public static Result fail(String code, String message) { + return new Result<>(code, null, message); + } +} + +// com/label/common/result/PageResult.java +public record PageResult(List items, long total, int page, int pageSize) {} +``` + +所有 Controller 返回 `Result`,禁止直接返回裸 POJO 或裸 List。分页接口返回 `Result>`。 + +### 3.2 全局异常处理 + +```java +// com/label/common/exception/GlobalExceptionHandler.java +@RestControllerAdvice +public class GlobalExceptionHandler { + + @ExceptionHandler(BusinessException.class) + public ResponseEntity> handleBusiness(BusinessException ex) { + return ResponseEntity.status(ex.getHttpStatus()) + .body(Result.fail(ex.getCode(), ex.getMessage())); + } + + @ExceptionHandler(UnauthorizedException.class) + public ResponseEntity> handleUnauthorized() { + return ResponseEntity.status(403) + .body(Result.fail("FORBIDDEN", "权限不足")); + } + + @ExceptionHandler(Exception.class) + public ResponseEntity> handleUnexpected(Exception ex, HttpServletRequest req) { + log.error("Unexpected error on {}", req.getRequestURI(), ex); + // 生产响应禁止包含堆栈跟踪 + return ResponseEntity.status(500) + .body(Result.fail("INTERNAL_ERROR", "服务器内部错误")); + } +} +``` + +### 3.3 多租户 CompanyContext(ThreadLocal) + +```java +// com/label/common/context/CompanyContext.java +public final class CompanyContext { + private static final ThreadLocal COMPANY_ID = new ThreadLocal<>(); + + public static void set(Long companyId) { COMPANY_ID.set(companyId); } + public static Long get() { return COMPANY_ID.get(); } + public static void clear() { COMPANY_ID.remove(); } // 必须在 finally 块调用 +} +``` + +在 Shiro `TokenFilter` 的 `executeLogin` 成功后注入;在过滤器的 `finally` 块调用 `CompanyContext.clear()`,防止线程池复用时数据串漏至其他租户。 + +**禁止**调用方通过请求体传入 `company_id` 作为参数。所有 Service 通过 `CompanyContext.get()` 获取。 + +MyBatis Plus 配置 `TenantLineInnerInterceptor` 自动向所有 Mapper 查询注入 `company_id = ?` 条件: + +```java +@Bean +public MybatisPlusInterceptor mybatisPlusInterceptor() { + MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); + interceptor.addInnerInterceptor(new TenantLineInnerInterceptor( + new TenantLineHandler() { + public Expression getTenantId() { + return new LongValue(CompanyContext.get()); + } + public String getTenantIdColumn() { return "company_id"; } + } + )); + return interceptor; +} +``` + +### 3.4 Shiro 配置 + +#### TokenFilter(继承 AuthenticatingFilter) + +```java +// com/label/common/shiro/TokenFilter.java +public class TokenFilter extends AuthenticatingFilter { + + @Override + protected AuthenticationToken createToken(ServletRequest req, ServletResponse res) { + String token = extractToken((HttpServletRequest) req); + return new TokenAuthenticationToken(token); + } + + @Override + protected boolean onAccessDenied(ServletRequest req, ServletResponse res) throws Exception { + String token = extractToken((HttpServletRequest) req); + if (token == null) { + sendUnauthorized(res, "缺少 Token"); + return false; + } + return executeLogin(req, res); + } + + @Override + protected boolean onLoginSuccess(AuthenticationToken token, Subject subject, + ServletRequest req, ServletResponse res) throws Exception { + UserSession session = (UserSession) subject.getPrincipal(); + // 滑动过期:每次有效请求重置 TTL + long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L); + redisTemplate.expire(RedisKeyManager.tokenKey(session.getToken()), ttl, SECONDS); + // 注入租户上下文(在整个 Filter 链执行期间有效,由 doFilterInternal 的 finally 清理) + CompanyContext.set(session.getCompanyId()); + return true; // 继续执行 Filter 链(进入 Controller) + } + + // ⚠️ 必须在此处清理 ThreadLocal,而非 onLoginSuccess—— + // onLoginSuccess 返回 true 后 Filter 链才继续执行,此时 Controller 尚未运行。 + // doFilterInternal 的 finally 块保证请求结束后(无论正常还是异常)都清理上下文。 + @Override + protected void doFilterInternal(ServletRequest req, ServletResponse res, FilterChain chain) + throws ServletException, IOException { + try { + super.doFilterInternal(req, res, chain); + } finally { + CompanyContext.clear(); + } + } + + private String extractToken(HttpServletRequest req) { + String header = req.getHeader("Authorization"); + if (header != null && header.startsWith("Bearer ")) { + return header.substring(7); + } + return null; + } +} +``` + +#### UserRealm(继承 AuthorizingRealm) + +```java +// com/label/common/shiro/UserRealm.java +public class UserRealm extends AuthorizingRealm { + + // 认证:从 Redis 验证 token:{uuid} 是否存在 + @Override + protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) { + String uuid = ((TokenAuthenticationToken) token).getToken(); + Map data = redisTemplate.opsForHash() + .entries(RedisKeyManager.tokenKey(uuid)); + if (data.isEmpty()) throw new UnknownAccountException("Token 无效或已过期"); + UserSession session = UserSession.from(uuid, data); + return new SimpleAuthenticationInfo(session, uuid, getName()); + } + + // 鉴权:先查 Redis user:perm:{userId},未命中查 PostgreSQL + @Override + protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) { + UserSession session = (UserSession) principals.getPrimaryPrincipal(); + String permKey = RedisKeyManager.userPermKey(session.getUserId()); + String cachedRole = (String) redisTemplate.opsForValue().get(permKey); + if (cachedRole == null) { + SysUser user = sysUserMapper.selectById(session.getUserId()); + cachedRole = user.getRole(); + redisTemplate.opsForValue().set(permKey, cachedRole, 5, MINUTES); + } + SimpleAuthorizationInfo info = new SimpleAuthorizationInfo(); + info.addRole(cachedRole); + // 角色继承:高级角色包含所有低级角色权限 + addInheritedRoles(info, cachedRole); + return info; + } + + private void addInheritedRoles(SimpleAuthorizationInfo info, String role) { + // ADMIN ⊃ REVIEWER ⊃ ANNOTATOR ⊃ UPLOADER + switch (role) { + case "ADMIN" -> info.addRoles(Set.of("REVIEWER", "ANNOTATOR", "UPLOADER")); + case "REVIEWER" -> info.addRoles(Set.of("ANNOTATOR", "UPLOADER")); + case "ANNOTATOR" -> info.addRoles(Set.of("UPLOADER")); + } + } +} +``` + +#### ShiroConfig 过滤器链 + +```java +@Bean +public ShiroFilterChainDefinition shiroFilterChainDefinition() { + DefaultShiroFilterChainDefinition chain = new DefaultShiroFilterChainDefinition(); + chain.addPathDefinition("/api/auth/login", "anon"); + chain.addPathDefinition("/api/**", "tokenFilter"); + return chain; +} +``` + +权限声明使用注解,禁止在 Service 内使用 if-role 临时判断: + +```java +@RequiresRoles("ADMIN") +public void createTask(...) + +@RequiresRoles(value = {"ANNOTATOR", "REVIEWER", "ADMIN"}, logical = Logical.OR) +public void claimTask(...) +``` + +#### 角色变更立即驱逐缓存 + +```java +// UserService.updateRole() 内 +@Transactional +public void updateRole(Long userId, String newRole) { + userMapper.updateRole(userId, newRole); + // 立即驱逐,不等 TTL 自然过期(延迟在禁用高权限账号时存在安全窗口) + redisTemplate.delete(RedisKeyManager.userPermKey(userId)); +} +``` + +### 3.5 Redis Key 管理 + +```java +// com/label/common/redis/RedisKeyManager.java +public final class RedisKeyManager { + public static String tokenKey(String uuid) { return "token:" + uuid; } + public static String userPermKey(Long userId) { return "user:perm:" + userId; } + public static String taskClaimKey(Long taskId){ return "task:claim:" + taskId; } +} +``` + +禁止在上述三类命名空间之外自造 Key 用于认证、权限或锁目的。 + +### 3.6 AOP 审计日志切面 + +```java +// com/label/common/aop/OperationLog.java(注解) +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface OperationLog { + String type(); // 对应 operation_type 枚举值,如 "TASK_CLAIM" + String targetType() default ""; +} + +// com/label/common/aop/AuditAspect.java +@Aspect +@Component +public class AuditAspect { + + @Around("@annotation(operationLog)") + public Object audit(ProceedingJoinPoint pjp, OperationLog operationLog) throws Throwable { + String result = "SUCCESS"; + String errorMsg = null; + Object returnVal = null; + try { + returnVal = pjp.proceed(); + } catch (Throwable ex) { + result = "FAIL"; + errorMsg = ex.getMessage(); + throw ex; + } finally { + // 业务事务已 commit/rollback 后,以独立操作写入审计记录 + // 审计写入失败只记录 error 日志,禁止回滚业务事务 + try { + saveAuditLog(operationLog.type(), operationLog.targetType(), result, errorMsg); + } catch (Exception auditEx) { + log.error("审计日志写入失败: type={}", operationLog.type(), auditEx); + } + } + return returnVal; + } + + private void saveAuditLog(String type, String targetType, String result, String errorMsg) { + UserSession session = getCurrentSession(); + SysOperationLog log = SysOperationLog.builder() + .companyId(CompanyContext.get()) + .operatorId(session != null ? session.getUserId() : null) + .operatorName(session != null ? session.getUsername() : "unknown") + .operationType(type) + .targetType(targetType.isEmpty() ? null : targetType) + .result(result) + .errorMessage(errorMsg) + .ipAddress(getClientIp()) + .build(); + operationLogMapper.insert(log); + } +} +``` + +### 3.7 RustFS S3 客户端 + +RustFS 实现 S3 兼容接口,使用 AWS SDK for Java v2 连接: + +```java +// com/label/common/storage/RustFsClient.java +@Component +public class RustFsClient { + + private final S3Client s3Client; // 配置 endpoint 指向 RustFS 地址 + + public String upload(String bucket, String key, InputStream data, long size) { + PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).build(); + s3Client.putObject(req, RequestBody.fromInputStream(data, size)); + return key; + } + + public InputStream download(String bucket, String key) { + GetObjectRequest req = GetObjectRequest.builder().bucket(bucket).key(key).build(); + return s3Client.getObject(req); + } + + public void delete(String bucket, String key) { + s3Client.deleteObject(b -> b.bucket(bucket).key(key)); + } + + public String getPresignedUrl(String bucket, String key, Duration expiry) { + S3Presigner presigner = S3Presigner.builder().endpointOverride(endpointUri).build(); + return presigner.presignGetObject(b -> b.signatureDuration(expiry) + .getObjectRequest(r -> r.bucket(bucket).key(key))).url().toString(); + } +} +``` + +**对象存储路径规范:** + +| 资源类型 | 存储桶 | 路径格式 | +|----------|--------|----------| +| 上传文本文件 | `source-data` | `text/{yyyyMM}/{source_id}.txt` | +| 上传图片 | `source-data` | `image/{yyyyMM}/{source_id}.jpg` | +| 上传视频 | `source-data` | `video/{yyyyMM}/{source_id}.mp4` | +| 视频帧图 | `source-data` | `frames/{source_id}/{frame_index}.jpg` | +| 视频片段转译文本 | `source-data` | `video-text/{parent_source_id}/{timestamp}.txt` | +| bbox 裁剪图 | `source-data` | `crops/{task_id}/{item_index}.jpg` | +| 导出 JSONL | `finetune-export` | `export/{batchUuid}.jsonl` | + +### 3.8 AI 服务 HTTP 客户端 + +Spring Boot 3 使用 `RestClient` 封装对 Python FastAPI 服务的同步 HTTP 调用: + +```java +// com/label/common/ai/AiServiceClient.java +@Component +public class AiServiceClient { + + private final RestClient restClient; + + public AiServiceClient(@Value("${ai.service.base-url}") String baseUrl) { + this.restClient = RestClient.builder().baseUrl(baseUrl).build(); + } + + // POST /api/v1/text/extract + public TextExtractResponse extractText(String filePath, String model) { + return restClient.post().uri("/api/v1/text/extract") + .body(Map.of("file_path", filePath, "model", model)) + .retrieve().body(TextExtractResponse.class); + } + + // POST /api/v1/image/extract + public ImageExtractResponse extractImage(String imagePath, String model) { + return restClient.post().uri("/api/v1/image/extract") + .body(Map.of("image_path", imagePath, "model", model)) + .retrieve().body(ImageExtractResponse.class); + } + + // POST /api/v1/video/extract-frames + public VideoFramesResponse extractFrames(String videoPath, String frameInterval, String mode) { + return restClient.post().uri("/api/v1/video/extract-frames") + .body(Map.of("video_path", videoPath, "frame_interval", frameInterval, "mode", mode)) + .retrieve().body(VideoFramesResponse.class); + } + + // POST /api/v1/video/to-text + public VideoToTextResponse videoToText(String videoPath, int startSec, int endSec, String model) { + return restClient.post().uri("/api/v1/video/to-text") + .body(Map.of("video_path", videoPath, "start_sec", startSec, "end_sec", endSec, "model", model)) + .retrieve().body(VideoToTextResponse.class); + } + + // POST /api/v1/qa/gen-text + public QaGenResponse genTextQa(List triples, String model, String promptTemplate) { + return restClient.post().uri("/api/v1/qa/gen-text") + .body(Map.of("items", triples, "model", model, "prompt_template", promptTemplate)) + .retrieve().body(QaGenResponse.class); + } + + // POST /api/v1/qa/gen-image + public QaGenResponse genImageQa(List quadruples, String model, String promptTemplate) { + return restClient.post().uri("/api/v1/qa/gen-image") + .body(Map.of("items", quadruples, "model", model, "prompt_template", promptTemplate)) + .retrieve().body(QaGenResponse.class); + } + + // POST /api/v1/finetune/start + public FinetuneStartResponse startFinetune(String jsonlUrl, String baseModel, Map params) { + return restClient.post().uri("/api/v1/finetune/start") + .body(Map.of("jsonl_url", jsonlUrl, "base_model", baseModel, "params", params)) + .retrieve().body(FinetuneStartResponse.class); + } + + // GET /api/v1/finetune/status/{jobId} + public FinetuneStatusResponse getFinetuneStatus(String jobId) { + return restClient.get().uri("/api/v1/finetune/status/{jobId}", jobId) + .retrieve().body(FinetuneStatusResponse.class); + } +} +``` + +--- + +## 四、业务模块纵切 + +### 4.1 用户与权限模块 + +**Entity(SysUser)**:字段同 DDL,`passwordHash` 字段加 `@JsonIgnore` 禁止序列化到响应。 + +**AuthService 核心逻辑:** + +```java +// 登录 +@OperationLog(type = "USER_LOGIN") +public String login(Long companyId, String username, String password, String ip) { + SysUser user = userMapper.selectByCompanyAndUsername(companyId, username); + if (user == null || !BCrypt.checkpw(password, user.getPasswordHash())) + throw new BusinessException("USER_NOT_FOUND", "用户名或密码错误"); + if ("DISABLED".equals(user.getStatus())) + throw new BusinessException("USER_DISABLED", "账号已禁用"); + + String token = UUID.randomUUID().toString(); + Map tokenData = Map.of( + "userId", user.getId(), "role", user.getRole(), "companyId", user.getCompanyId(), + "username", user.getUsername() + ); + long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L); + redisTemplate.opsForHash().putAll(RedisKeyManager.tokenKey(token), tokenData); + redisTemplate.expire(RedisKeyManager.tokenKey(token), ttl, SECONDS); + return token; +} + +// 退出登录 +@OperationLog(type = "USER_LOGOUT") +public void logout(String token) { + redisTemplate.delete(RedisKeyManager.tokenKey(token)); +} +``` + +**接口清单:** + +| 方法 | 路径 | 最低权限 | 说明 | +|------|------|----------|------| +| POST | `/api/auth/login` | 匿名 | 返回 Token(UUID) | +| POST | `/api/auth/logout` | 已登录 | 删除 Redis Token | +| GET | `/api/auth/me` | 已登录 | 当前用户信息与角色 | +| GET | `/api/users` | ADMIN | 分页查询用户列表 | +| POST | `/api/users` | ADMIN | 创建用户 | +| PUT | `/api/users/{id}` | ADMIN | 更新用户信息 | +| PUT | `/api/users/{id}/status` | ADMIN | 启用/禁用账号(同步驱逐权限缓存) | +| PUT | `/api/users/{id}/role` | ADMIN | 变更角色(同步驱逐权限缓存) | + +--- + +### 4.2 资料管理模块 + +**SourceService 核心逻辑:** + +```java +@OperationLog(type = "SOURCE_UPLOAD") +@Transactional +public SourceData upload(MultipartFile file, String dataType) { + Long companyId = CompanyContext.get(); + // 1. 先创建 source_data 记录获取 ID(用于构造存储路径) + SourceData sd = SourceData.builder() + .companyId(companyId).uploaderId(getCurrentUserId()) + .dataType(dataType).fileName(file.getOriginalFilename()) + .fileSize(file.getSize()).bucketName("source-data") + .status("PENDING").build(); + sourceDataMapper.insert(sd); + + // 2. 构造路径并上传至 RustFS + String path = buildPath(dataType, sd.getId()); + rustFsClient.upload("source-data", path, file.getInputStream(), file.getSize()); + + // 3. 更新 file_path + sd.setFilePath(path); + sourceDataMapper.updateById(sd); + return sd; +} + +private String buildPath(String dataType, Long sourceId) { + String ym = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); + return switch (dataType) { + case "TEXT" -> "text/" + ym + "/" + sourceId + ".txt"; + case "IMAGE" -> "image/" + ym + "/" + sourceId + ".jpg"; + case "VIDEO" -> "video/" + ym + "/" + sourceId + ".mp4"; + default -> throw new BusinessException("INVALID_TYPE", "不支持的资料类型"); + }; +} +``` + +**接口清单:** + +| 方法 | 路径 | 最低权限 | 说明 | +|------|------|----------|------| +| POST | `/api/source/upload` | UPLOADER | 上传文件,创建 source_data 记录 | +| GET | `/api/source/list` | UPLOADER | 分页查询(UPLOADER 只见自己,ADMIN 见全部) | +| GET | `/api/source/{id}` | UPLOADER | 查看资料详情 | +| DELETE | `/api/source/{id}` | ADMIN | 删除资料及 RustFS 文件 | + +--- + +### 4.3 任务管理模块(含并发控制) + +**TaskClaimService.claim() — 双重保障:** + +```java +@Transactional +@OperationLog(type = "TASK_CLAIM") +public void claim(Long taskId) { + Long userId = getCurrentUserId(); + // 第一重:Redis 分布式锁(TTL 30s,SET NX) + Boolean locked = redisTemplate.opsForValue() + .setIfAbsent(RedisKeyManager.taskClaimKey(taskId), userId.toString(), 30, SECONDS); + if (!Boolean.TRUE.equals(locked)) + throw new BusinessException("TASK_CLAIMED", "任务已被他人领取"); + + // 第二重:数据库乐观约束(WHERE status = 'UNCLAIMED') + // UPDATE annotation_task SET status='IN_PROGRESS', claimed_by=?, claimed_at=NOW() + // WHERE id=? AND status='UNCLAIMED' AND company_id=? + int affected = taskMapper.claimTask(taskId, userId, CompanyContext.get()); + if (affected == 0) + throw new BusinessException("TASK_CLAIMED", "任务已被他人领取"); + + // 写入任务历史(同一事务) + insertHistory(taskId, "UNCLAIMED", "IN_PROGRESS", userId, null); +} + +public void unclaim(Long taskId) { + StateValidator.assertTransition(TaskStatus.IN_PROGRESS, TaskStatus.UNCLAIMED, TaskStatus.TRANSITIONS); + // 更新 claimed_by = NULL, status = UNCLAIMED + taskMapper.unclaim(taskId, getCurrentUserId(), CompanyContext.get()); + redisTemplate.delete(RedisKeyManager.taskClaimKey(taskId)); + insertHistory(taskId, "IN_PROGRESS", "UNCLAIMED", getCurrentUserId(), null); +} +``` + +**每次 status 变更必须在同一事务中调用 `insertHistory()` 写入 `annotation_task_history`。** + +**接口清单:** + +| 方法 | 路径 | 最低权限 | 说明 | +|------|------|----------|------| +| POST | `/api/tasks` | ADMIN | 为指定 source 创建 EXTRACTION 任务 | +| GET | `/api/tasks/pool` | ANNOTATOR | 查看可领取任务列表(按角色过滤,分页)。**角色过滤规则**:ANNOTATOR 返回 `phase=EXTRACTION AND status=UNCLAIMED`;REVIEWER 返回 `phase=EXTRACTION AND status=UNCLAIMED` ∪ `phase=QA_GENERATION AND status=UNCLAIMED`(REVIEWER 含 ANNOTATOR 继承权限,可领取两类任务);ADMIN 返回所有 phase、所有 status 的任务(走 `GET /api/tasks`)。 | +| GET | `/api/tasks/pending-review` | REVIEWER | **审批收件箱**:返回 `status=SUBMITTED` 的任务列表(分页)。REVIEWER 看本公司全部待审批任务;ADMIN 同等权限。ANNOTATOR 无权访问此接口(403)。 | +| POST | `/api/tasks/{id}/claim` | ANNOTATOR | 领取任务(争抢式) | +| POST | `/api/tasks/{id}/unclaim` | ANNOTATOR | 放弃任务,退回任务池 | +| POST | `/api/tasks/{id}/reclaim` | ANNOTATOR | **重拾被驳回的任务**:仅当 `task.status=REJECTED AND task.claimed_by=currentUserId` 时合法;将 status 置为 IN_PROGRESS(状态机 REJECTED → IN_PROGRESS);写入任务历史。 | +| GET | `/api/tasks/mine` | ANNOTATOR | 查询我领取的任务列表(分页),**包含 REJECTED 状态**(让标注员发现被驳回的任务)。 | +| GET | `/api/tasks/{id}` | ANNOTATOR | 查看任务详情 | +| GET | `/api/tasks` | ADMIN | 查询全部任务(支持过滤,分页) | +| PUT | `/api/tasks/{id}/reassign` | ADMIN | 强制转移任务归属(更新 claimed_by,task status 保持 IN_PROGRESS,向 annotation_task_history 写入 from=IN_PROGRESS / to=IN_PROGRESS + note=转移原因) | + +--- + +### 4.4 标注工作台模块(EXTRACTION 阶段) + +**ExtractionService 核心逻辑:** + +```java +// AI 辅助预标注(标注员领取任务后调用) +public AnnotationResult aiPreAnnotate(Long taskId) { + AnnotationTask task = taskMapper.selectById(taskId); + SourceData source = sourceDataMapper.selectById(task.getSourceId()); + Object aiResult = "IMAGE".equals(source.getDataType()) || task.getVideoUnitType() != null + ? aiServiceClient.extractImage(resolveImagePath(task), task.getAiModel()) + : aiServiceClient.extractText(source.getFilePath(), task.getAiModel()); + AnnotationResult result = buildAnnotationResult(task, aiResult); + annotationResultMapper.insertOrReplace(result); + return result; +} + +// 更新提取结果(整体覆盖,PUT 语义) +public void updateResult(Long taskId, String resultJsonStr) { + // 验证 JSON 格式合法性 + validateResultJson(resultJsonStr); + // 整体替换 result_json,禁止局部 PATCH 或逐条追加 + annotationResultMapper.updateResultJson(taskId, resultJsonStr, CompanyContext.get()); +} + +// 审批通过——两阶段设计: +// 阶段一(同步 @Transactional):完成审批核心动作(步骤 1-3),立即返回 +// 阶段二(异步事件):AI 调用 + QA 任务创建(步骤 4-7)由 ExtractionApprovedEvent 驱动 +// ⚠️ 禁止将 AI HTTP 调用放入 @Transactional 内:会长期占用 DB 连接,且 AI 失败会回滚已完成的审批动作 +// ⚠️ 自审防护:提交人(claimed_by)不能同时作为审批人 +@Transactional +@OperationLog(type = "EXTRACTION_APPROVE") +public void approve(Long taskId) { + AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); + if (task.getClaimedBy().equals(getCurrentUserId())) + throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务"); + AnnotationResult result = annotationResultMapper.selectByTaskId(taskId); + + // 1. annotation_result.is_final = true + result.setIsFinal(true); + annotationResultMapper.updateById(result); + + // 2. annotation_task.status → APPROVED + StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.APPROVED, TaskStatus.TRANSITIONS); + task.setStatus("APPROVED"); + task.setCompletedAt(LocalDateTime.now()); + taskMapper.updateById(task); + + // 3. 写入任务历史 + insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null); + + // ---- 事务边界 ---- + // 步骤 4-7 通过 Spring ApplicationEvent 在事务提交后异步执行, + // 避免 AI HTTP 调用阻塞事务(参见 ExtractionApprovedEventListener) + applicationEventPublisher.publishEvent(new ExtractionApprovedEvent(taskId, task.getSourceId())); +} + +// ExtractionApprovedEventListener(@TransactionalEventListener(phase = AFTER_COMMIT),@Async) +// 4. 调用 AI 生成候选问答对 +// String promptKey = "IMAGE".equals(getSourceType(task)) ? "prompt_qa_gen_image" : "prompt_qa_gen_text"; +// QaGenResponse qaResponse = generateQa(task, result, promptTemplate); +// +// 5. 将候选问答对写入 training_dataset(PENDING_REVIEW) +// +// 6. 创建 QA_GENERATION 阶段任务(UNCLAIMED) +// +// 7. source_data.status → QA_REVIEW +``` + +**ExtractionService.reject() 核心逻辑:** + +```java +// 驳回——task 回退至 REJECTED,由标注员重新 claim 后进入 IN_PROGRESS +@Transactional +@OperationLog(type = "EXTRACTION_REJECT") +public void reject(Long taskId, String reason) { + AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); + if (task.getClaimedBy().equals(getCurrentUserId())) + throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务"); + + StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS); + task.setStatus("REJECTED"); + task.setRejectReason(reason); + taskMapper.updateById(task); + insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason); + // ⚠️ source_data 状态保持 EXTRACTING(不回退),等标注员重新 claim 后继续修改 +} +``` + +> **source_data EXTRACTING 状态说明**:任务从任务池创建(`ADMIN POST /api/tasks`)时,后端同步将 `source_data.status → EXTRACTING`。EXTRACTION 审批驳回后 source_data 保持 EXTRACTING 不变,标注员重新领取修改后重提,审批通过后再推进至 QA_REVIEW。 + +**接口清单:** + +| 方法 | 路径 | 最低权限 | 说明 | +|------|------|----------|------| +| GET | `/api/extraction/{taskId}` | ANNOTATOR | 获取当前提取结果(含 AI 预标注) | +| PUT | `/api/extraction/{taskId}` | ANNOTATOR | 更新提取结果(整体 JSONB 覆盖) | +| POST | `/api/extraction/{taskId}/submit` | ANNOTATOR | 提交提取结果,进入审批队列 | +| POST | `/api/extraction/{taskId}/approve` | REVIEWER | 审批通过(自审防护:不能审批自己提交的任务),自动触发 QA 任务创建 | +| POST | `/api/extraction/{taskId}/reject` | REVIEWER | 驳回(自审防护同上),附驳回原因;task → REJECTED,标注员需重新 claim | + +--- + +### 4.5 问答生成模块(QA_GENERATION 阶段) + +`QaService` 的整体覆盖逻辑与 `ExtractionService` 一致(PUT 语义,禁止局部 PATCH)。 + +**QaService.reject() 核心逻辑:** + +```java +@Transactional +@OperationLog(type = "QA_REJECT") +public void reject(Long taskId, String reason) { + AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); + if (task.getClaimedBy().equals(getCurrentUserId())) + throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务"); + + // training_dataset.status → REJECTED(标注员修改后重提,状态回 PENDING_REVIEW) + trainingDatasetMapper.rejectByTaskId(taskId, reason, CompanyContext.get()); + + StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS); + task.setStatus("REJECTED"); + task.setRejectReason(reason); + taskMapper.updateById(task); + insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason); + // source_data 保持 QA_REVIEW,等标注员重新领取修改后重提 +} +``` + +**approve 级联动作(同一事务):** + +```java +@Transactional +@OperationLog(type = "QA_APPROVE") +public void approve(Long taskId) { + AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); + if (task.getClaimedBy().equals(getCurrentUserId())) + throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务"); + + // 1. training_dataset.status → APPROVED + trainingDatasetMapper.approveByTaskId(taskId, getCurrentUserId(), CompanyContext.get()); + + // 2. annotation_task.status → APPROVED + task.setStatus("APPROVED"); + task.setCompletedAt(LocalDateTime.now()); + taskMapper.updateById(task); + + // 3. source_data.status → APPROVED(整条流水线完成) + sourceDataMapper.updateStatus(task.getSourceId(), "APPROVED", CompanyContext.get()); + + // 4. 写入任务历史 + insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null); +} +``` + +**接口清单:** + +| 方法 | 路径 | 最低权限 | 说明 | +|------|------|----------|------| +| GET | `/api/qa/{taskId}` | ANNOTATOR | 获取候选问答对列表 | +| PUT | `/api/qa/{taskId}` | ANNOTATOR | 修改问答对(整体覆盖) | +| POST | `/api/qa/{taskId}/submit` | ANNOTATOR | 提交问答对,进入审批队列 | +| POST | `/api/qa/{taskId}/approve` | REVIEWER | 审批通过(自审防护),training_dataset → APPROVED,source_data → APPROVED | +| POST | `/api/qa/{taskId}/reject` | REVIEWER | 驳回(自审防护),training_dataset → REJECTED;标注员需重新 claim 修改后重提(重提后 training_dataset → PENDING_REVIEW) | + +--- + +### 4.6 训练数据导出模块 + +**ExportService.createBatch() 核心逻辑:** + +```java +@Transactional +@OperationLog(type = "EXPORT_CREATE") +public ExportBatch createBatch(List sampleIds) { + Long companyId = CompanyContext.get(); + // 1. 校验样本均为 APPROVED 状态 + List samples = trainingDatasetMapper + .selectByIdsAndStatus(sampleIds, "APPROVED", companyId); + if (samples.size() != sampleIds.size()) + throw new BusinessException("INVALID_SAMPLES", "部分样本不处于 APPROVED 状态"); + + // 2. 生成 JSONL 内容(每行一个 glm_format_json) + String batchUuid = UUID.randomUUID().toString(); + String jsonl = samples.stream() + .map(s -> s.getGlmFormatJson().toString()) + .collect(Collectors.joining("\n")); + + // 3. 上传至 RustFS:finetune-export/export/{batchUuid}.jsonl + String path = "export/" + batchUuid + ".jsonl"; + byte[] bytes = jsonl.getBytes(StandardCharsets.UTF_8); + rustFsClient.upload("finetune-export", path, new ByteArrayInputStream(bytes), bytes.length); + + // 4. 更新样本 export_batch_id 与 exported_at + trainingDatasetMapper.batchUpdateExportInfo(sampleIds, batchUuid, LocalDateTime.now()); + + // 5. 写入 export_batch 记录 + ExportBatch batch = ExportBatch.builder() + .companyId(companyId).batchUuid(batchUuid).datasetFilePath(path) + .sampleCount(samples.size()).finetuneStatus("NOT_STARTED") + .createdBy(getCurrentUserId()).build(); + exportBatchMapper.insert(batch); + return batch; +} +``` + +**FinetuneService.trigger():** 调用 `aiServiceClient.startFinetune(...)` 获取 `glmJobId`,更新 `export_batch.glm_job_id` 和 `finetune_status = RUNNING`。 + +**接口清单(全部需要 ADMIN 权限):** + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/training/samples` | 分页查询已审批通过、待导出的样本 | +| POST | `/api/export/batch` | 创建导出批次,合并 JSONL 并上传 RustFS | +| POST | `/api/export/{batchId}/finetune` | 向 GLM 工厂提交微调任务 | +| GET | `/api/export/{batchId}/status` | 查询微调任务状态 | +| GET | `/api/export/list` | 分页查询所有导出批次 | + +--- + +### 4.7 系统配置模块 + +**SysConfigService.get(configKey):** 先按 `(companyId, configKey)` 查,未命中则按 `(NULL, configKey)` 查全局默认值,公司级配置优先覆盖全局值。 + +```java +public String get(String configKey) { + Long companyId = CompanyContext.get(); + // 先查公司级配置 + SysConfig cfg = configMapper.selectByCompanyAndKey(companyId, configKey); + if (cfg == null) { + // 回退到全局默认(company_id IS NULL) + cfg = configMapper.selectByCompanyAndKey(null, configKey); + } + return cfg != null ? cfg.getConfigValue() : null; +} +``` + +**接口清单(全部需要 ADMIN 权限):** + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/config` | 获取所有配置项(公司级 + 全局默认) | +| PUT | `/api/config/{key}` | 更新单项配置(含 Prompt 模板) | + +--- + +### 4.8 视频处理模块 + +**接口清单(全部需要 ADMIN 权限):** + +| 方法 | 路径 | 说明 | +|------|------|------| +| POST | `/api/video/{sourceId}/process-frames` | 触发帧模式预处理(创建 FRAME_EXTRACT 任务,source_data → PREPROCESSING) | +| POST | `/api/video/{sourceId}/process-to-text` | 触发片段模式预处理(创建 VIDEO_TO_TEXT 任务,source_data → PREPROCESSING) | +| GET | `/api/video/{sourceId}/jobs` | 查询该视频的所有异步处理任务列表 | +| PUT | `/api/video/jobs/{jobId}/reset` | 将 FAILED 状态的任务手动重置为 PENDING(仅 ADMIN 可操作) | +| POST | `/api/video/jobs/{jobId}/callback` | AI 服务回调接口(内网调用,不经过 Shiro Token 验证,通过 IP 白名单保护) | + +**VideoProcessService 核心逻辑:** + +```java +@Transactional +@OperationLog(type = "TASK_CREATE") +public VideoProcessJob createJob(Long sourceId, String jobType, JsonNode params) { + Long companyId = CompanyContext.get(); + // 1. source_data.status → PREPROCESSING + sourceDataMapper.updateStatus(sourceId, "PREPROCESSING", companyId); + // 2. 创建 video_process_job(PENDING) + VideoProcessJob job = VideoProcessJob.builder() + .companyId(companyId).sourceId(sourceId) + .jobType(jobType).params(params).status("PENDING").build(); + videoProcessJobMapper.insert(job); + // 3. 异步调用 AI 服务(非阻塞,由 AI 服务自行回调) + triggerAiAsync(job); + return job; +} + +// AI 服务回调:幂等处理 +@Transactional +public void handleCallback(Long jobId, boolean success, String outputPath, String errorMsg) { + VideoProcessJob job = videoProcessJobMapper.selectById(jobId); + // 幂等:已是 SUCCESS 状态则静默忽略重复回调,不得重复创建 annotation_task + if ("SUCCESS".equals(job.getStatus())) return; + + if (success) { + job.setStatus("SUCCESS"); + job.setOutputPath(outputPath); + job.setCompletedAt(LocalDateTime.now()); + // source_data.status → PENDING(进入后续标注流程) + sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId()); + + // ⚠️ 两种模式的后续任务创建(ADMIN 看到 PENDING 后可手动创建标注任务) + if ("FRAME_EXTRACT".equals(job.getJobType())) { + // 帧模式:AI 返回帧列表,每帧对应一个 annotation_task(phase=EXTRACTION,video_unit_type=FRAME) + // outputPath 指向帧列表 JSON 文件(存 RustFS),由 ADMIN 在 POST /api/tasks 时按帧创建任务 + // 不在此处自动创建 EXTRACTION 任务——ADMIN 决定如何调度多帧任务 + } else if ("VIDEO_TO_TEXT".equals(job.getJobType())) { + // 片段模式:派生 TEXT 类型 source_data,parent_source_id 指向原视频 + // 不在此处创建——ADMIN 收到 PENDING 通知后手动通过 POST /api/tasks 为派生 TEXT 资料创建任务 + // 派生 source_data 已在 triggerAiAsync() 中预创建(status=PENDING) + } + } else { + if (job.getRetryCount() >= job.getMaxRetries()) { + // 达最大重试次数,置 FAILED,source_data → PENDING 保留 error_message,需 ADMIN 手动重置 + job.setStatus("FAILED"); + job.setErrorMessage(errorMsg); + sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId()); + } else { + job.setStatus("RETRYING"); + job.setRetryCount(job.getRetryCount() + 1); + job.setErrorMessage(errorMsg); + } + } + videoProcessJobMapper.updateById(job); +} +``` + +--- + +## 五、状态机实现规范 + +所有状态变更**必须**经过 `StateValidator.assertTransition()` 校验,禁止绕过直接调用 Mapper 更新状态字段。 + +### 5.1 StateValidator + +```java +// com/label/common/statemachine/StateValidator.java +public final class StateValidator { + public static void assertTransition(S from, S to, Map> transitions) { + Set allowed = transitions.getOrDefault(from, Set.of()); + if (!allowed.contains(to)) + throw new BusinessException("INVALID_STATE_TRANSITION", + "非法状态转换: " + from + " → " + to); + } +} +``` + +### 5.2 source_data 状态机 + +```java +public enum SourceStatus { + PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED, REJECTED; + + public static final Map> TRANSITIONS = Map.of( + PENDING, Set.of(EXTRACTING, PREPROCESSING), + PREPROCESSING, Set.of(PENDING), + EXTRACTING, Set.of(QA_REVIEW), + QA_REVIEW, Set.of(APPROVED) + // ⚠️ source_data 不会进入 REJECTED 状态: + // QA 被驳回时 annotation_task → REJECTED,source_data 保持 QA_REVIEW。 + // 整条流水线唯一终态为 APPROVED。 + ); +} +``` + +### 5.3 annotation_task 状态机 + +```java +public enum TaskStatus { + UNCLAIMED, IN_PROGRESS, SUBMITTED, APPROVED, REJECTED; + + public static final Map> TRANSITIONS = Map.of( + UNCLAIMED, Set.of(IN_PROGRESS), + IN_PROGRESS, Set.of(SUBMITTED, UNCLAIMED, IN_PROGRESS), + // IN_PROGRESS → IN_PROGRESS 用于 ADMIN 强制转移(持有人变更,状态不变) + SUBMITTED, Set.of(APPROVED, REJECTED), + REJECTED, Set.of(IN_PROGRESS) // 驳回后重拾 + ); +} +``` + +### 5.4 training_dataset 状态机 + +```java +public enum DatasetStatus { + PENDING_REVIEW, APPROVED, REJECTED; + + public static final Map> TRANSITIONS = Map.of( + PENDING_REVIEW, Set.of(APPROVED, REJECTED), + REJECTED, Set.of(PENDING_REVIEW) // 驳回后可修改重提 + ); +} +``` + +### 5.5 video_process_job 状态机 + +```java +public enum VideoJobStatus { + PENDING, RUNNING, SUCCESS, FAILED, RETRYING; + + public static final Map> TRANSITIONS = Map.of( + PENDING, Set.of(RUNNING), + RUNNING, Set.of(SUCCESS, RETRYING, FAILED), + RETRYING, Set.of(RUNNING, FAILED) + // FAILED → PENDING 由 ADMIN 手动触发接口,不在此处声明自动流转 + ); +} +``` + +--- + +## 六、Docker Compose 配置 + +```yaml +# docker-compose.yml +version: "3.9" + +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_DB: label_db + POSTGRES_USER: label + POSTGRES_PASSWORD: label_password + volumes: + - postgres_data:/var/lib/postgresql/data + - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U label -d label_db"] + interval: 10s + timeout: 5s + retries: 5 + + redis: + image: redis:7-alpine + command: redis-server --requirepass redis_password + volumes: + - redis_data:/data + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "-a", "redis_password", "ping"] + interval: 10s + timeout: 5s + retries: 5 + + rustfs: + image: rustfs/rustfs:latest # 替换为生产可用的实际镜像 + environment: + RUSTFS_ACCESS_KEY: minioadmin + RUSTFS_SECRET_KEY: minioadmin + volumes: + - rustfs_data:/data + ports: + - "9000:9000" # S3 API 端口 + - "9001:9001" # Web 控制台端口 + + backend: + build: + context: . + dockerfile: Dockerfile + environment: + SPRING_DATASOURCE_URL: jdbc:postgresql://postgres:5432/label_db + SPRING_DATASOURCE_USERNAME: label + SPRING_DATASOURCE_PASSWORD: label_password + SPRING_REDIS_HOST: redis + SPRING_REDIS_PORT: 6379 + SPRING_REDIS_PASSWORD: redis_password + RUSTFS_ENDPOINT: http://rustfs:9000 + RUSTFS_ACCESS_KEY: minioadmin + RUSTFS_SECRET_KEY: minioadmin + AI_SERVICE_BASE_URL: http://ai-service:8000 + ports: + - "8080:8080" + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + rustfs: + condition: service_started + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"] + interval: 15s + retries: 5 + + ai-service: + build: + context: ../ai_service # Python FastAPI 服务所在目录 + dockerfile: Dockerfile + environment: + RUSTFS_ENDPOINT: http://rustfs:9000 + RUSTFS_ACCESS_KEY: minioadmin + RUSTFS_SECRET_KEY: minioadmin + ports: + - "8000:8000" + depends_on: + - rustfs + + frontend: + image: nginx:alpine + volumes: + - ../frontend/dist:/usr/share/nginx/html:ro + - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro + ports: + - "80:80" + depends_on: + backend: + condition: service_healthy + +volumes: + postgres_data: + redis_data: + rustfs_data: +``` + +**Dockerfile(backend):** + +```dockerfile +FROM eclipse-temurin:17-jre-alpine +WORKDIR /app +COPY target/label-backend-*.jar app.jar +EXPOSE 8080 +ENTRYPOINT ["java", "-jar", "app.jar"] +``` + +--- + +## 七、测试策略 + +### 7.1 基本原则 + +- 集成测试**必须**使用真实的 PostgreSQL 和 Redis 实例(Testcontainers),禁止仅 Mock 数据库 +- 每个受保护接口组**必须**有至少一个集成测试覆盖 Shiro 认证/鉴权过滤器链(缺少 Token → 401,角色不足 → 403) + +### 7.2 并发任务领取测试(必须) + +```java +@Test +void concurrentClaimShouldOnlySucceedOnce() throws InterruptedException { + // 准备:创建一个 UNCLAIMED 任务 + Long taskId = createUnclaimedTask(); + int threadCount = 10; + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger successCount = new AtomicInteger(0); + ExecutorService pool = Executors.newFixedThreadPool(threadCount); + + for (int i = 0; i < threadCount; i++) { + pool.submit(() -> { + try { + latch.await(); + taskClaimService.claim(taskId); + successCount.incrementAndGet(); + } catch (BusinessException e) { + // 预期:其余线程抛出"任务已被领取" + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + latch.countDown(); + pool.awaitTermination(5, SECONDS); + + // 验证:仅 1 个线程领取成功 + assertThat(successCount.get()).isEqualTo(1); + AnnotationTask task = taskMapper.selectById(taskId); + assertThat(task.getStatus()).isEqualTo("IN_PROGRESS"); + assertThat(task.getClaimedBy()).isNotNull(); +} +``` + +### 7.3 视频回调幂等测试(必须) + +```java +@Test +void duplicateSuccessCallbackShouldBeIdempotent() { + Long sourceId = createVideoSource(); + VideoProcessJob job = videoProcessService.createJob(sourceId, "FRAME_EXTRACT", params); + + // 第一次成功回调 + videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null); + // 重复成功回调 + videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null); + + // 验证:只创建一个 annotation_task + long taskCount = annotationTaskMapper.countBySourceId(sourceId); + assertThat(taskCount).isEqualTo(1); +} +``` + +### 7.4 状态机越界拒绝测试 + +```java +@Test +void illegalStateTransitionShouldThrow() { + // 验证:APPROVED → IN_PROGRESS 被拒绝 + assertThatThrownBy(() -> + StateValidator.assertTransition(TaskStatus.APPROVED, TaskStatus.IN_PROGRESS, TaskStatus.TRANSITIONS) + ).isInstanceOf(BusinessException.class) + .hasMessageContaining("非法状态转换"); +} +``` + +### 7.5 多租户隔离测试 + +```java +@Test +void companyACannotAccessCompanyBData() { + Long sourceIdB = createSourceForCompany(companyB); + // 以 companyA 身份请求 companyB 的资源 + CompanyContext.set(companyA); + assertThatThrownBy(() -> sourceService.findById(sourceIdB)) + .isInstanceOf(BusinessException.class); +} +``` + +--- + +## 八、宪章合规检查清单 + +PR 合并前评审人**必须**逐条核对以下清单: + +| # | 宪章原则 | 实现位置 | 检查要点 | +|---|----------|----------|----------| +| 1 | 环境约束(JDK 17、SB 3、Shiro、MyBatis Plus) | `pom.xml` | 版本号符合约束;无 Spring Security 并行引入 | +| 2 | 多租户数据隔离 | `TenantLineInnerInterceptor`、`CompanyContext` | 所有 Mapper 自动注入 company_id;ThreadLocal 在 finally 块清理 | +| 3 | BCrypt 密码、UUID Token、滑动过期、禁 JWT | `AuthService`、`TokenFilter`、`RedisKeyManager` | 无明文密码存储;每次有效请求重置 TTL;无 JWT 库引入 | +| 4 | 分级 RBAC、权限注解、角色变更驱逐缓存 | `UserRealm`、`@RequiresRoles`、`UserService` | 无 if-role 临时判断;`updateRole()` 立即删缓存 | +| 5 | 双流水线、级联触发 QA 任务、parent_source_id 溯源 | `ExtractionService.approve()` | 级联动作在同一 @Transactional 内;视频转文本的 parent_source_id 不为空 | +| 6 | 状态机完整性 | `StateValidator`、各 `*Status` 枚举 | 所有状态变更调用 `StateValidator`;无绕过直接写 Mapper 的路径 | +| 7 | 任务争抢双重保障 | `TaskClaimService.claim()` | Redis SET NX + DB WHERE status='UNCLAIMED' 两道并存 | +| 8 | 异步任务幂等、重试上限、FAILED 需手动重置 | `VideoProcessService.handleCallback()` | 重复成功回调静默忽略;`retry_count >= max_retries` 置 FAILED | +| 9 | 只追加审计日志、AOP 切面、审计失败不回滚业务 | `AuditAspect`、`@OperationLog` | `sys_operation_log` 无 UPDATE/DELETE;审计异常仅 error 日志 | +| 10 | RESTful URL、统一响应格式、分页必须 | `Result`、各 Controller | 无动词路径;无裸 POJO 返回;所有列表接口有分页参数 | +| 11 | YAGNI:业务在 Service,Controller 只处理 HTTP | 包结构 | Controller 无业务判断逻辑;无未使用的抽象层 | + +--- + +## 九、设计评审报告 + +**评审日期**:2026-04-09 | **评审方式**:独立子 Agent 五维评审 + 人工复核 | **初始质量评分**:6.5/10 → 修复后:**8.5/10** + +### 9.1 发现并已修复的问题 + +| # | 严重级 | 位置 | 问题描述 | 修复内容 | +|---|--------|------|----------|----------| +| A | CRITICAL | §4.3 任务池接口 | `GET /api/tasks/pool` 的角色过滤逻辑不明确,开发者无法判断各角色看到哪些任务 | 在接口说明中补充:ANNOTATOR 看 EXTRACTION-UNCLAIMED;REVIEWER 看两类 UNCLAIMED;ADMIN 走全量接口 | +| B | HIGH | §4.4、§4.5 审批接口 | approve/reject 接口未声明"提交人不能自审"约束,存在质量绕过风险 | 在 `approve()` 和 `reject()` 代码示例中加入自审防护检查:`if (task.getClaimedBy().equals(getCurrentUserId())) throw` | +| C | HIGH | §4.8 视频处理 | 视频处理模块完全缺失 REST API 接口清单,帧模式/片段模式成功后的后续任务创建分支不清晰 | 补充 5 条接口(含 ADMIN-only 权限),在 handleCallback() 中明确两种模式的处理路径差异 | +| D | MEDIUM | §4.4 | 缺少 `ExtractionService.reject()` 代码示例,驳回后 source_data 状态不明确 | 补充 reject() 完整实现;说明 source_data 保持 EXTRACTING 等待标注员重提 | +| E | MEDIUM | §4.5 | 缺少 `QaService.reject()` 代码示例,training_dataset 驳回后状态流转不清晰 | 补充 reject():training_dataset → REJECTED;重提后 → PENDING_REVIEW | +| F | LOW | §4.3 | reassign 接口说明过于简洁,未说明 annotation_task_history 写入规则 | 补充说明:保持 IN_PROGRESS,history 写入 from=IN_PROGRESS/to=IN_PROGRESS + note | + +### 9.2 未修复的设计选择(需业务澄清) + +| # | 问题 | 背景 | 建议 | +|---|------|------|------| +| G | 原需求提到"若任务池中无空闲审批员,由标注员兼任审批" | 宪章要求 REVIEWER ⊃ ANNOTATOR 角色继承,系统无法动态提升 ANNOTATOR 权限 | **建议取消此机制**:统一要求 QA 审批必须由 REVIEWER 或 ADMIN 执行;若无可用 REVIEWER,由 ADMIN 代为审批。若需保留,需修改宪章并引入动态权限提升机制(超出当前架构范围) | +| H | `GET /api/tasks/{id}` 的精细访问控制 | ANNOTATOR 是否只能查看自己的任务?REJECTED 任务是否对原领取人可见? | 当前设计依赖 `company_id` 隔离,建议 Service 层加 `claimed_by = currentUserId OR role >= REVIEWER` 的访问控制检查 | + +### 9.3 修复后宪章合规状态 + +| 宪章原则 | 修复前 | 修复后 | +|----------|--------|--------| +| 四、分级 RBAC(权限注解声明) | ✅ 基本覆盖 | ✅ 补充了自审防护,任务池过滤规则完整 | +| 五、双标注流水线(级联触发规则) | ⚠️ reject 路径缺失 | ✅ reject() 示例补全,source_data 回退路径明确 | +| 六、状态机完整性 | ⚠️ QA reject 状态转换不明 | ✅ training_dataset REJECTED → PENDING_REVIEW 完整 | +| 八、异步任务处理(幂等回调) | ⚠️ 两种视频模式处理分支缺失 | ✅ handleCallback 分支注释明确 | + +--- + +### 9.4 审批流程合理性专项评审(第二轮) + +**评审日期**:2026-04-09 | **评审焦点**:审批工作流是否可被实际执行 + +#### 9.4.1 发现的问题 + +| # | 严重级 | 位置 | 问题描述 | 根因 | 建议修复 | +|---|--------|------|----------|------|----------| +| I | CRITICAL | §4.3、§4.4、§4.5 | **REVIEWER 无审批收件箱**:`GET /api/tasks/pool` 仅返回 UNCLAIMED 任务,REVIEWER 无法发现哪些任务处于 SUBMITTED 状态等待审批。整个审批流程在 API 层面缺失入口。 | 任务池设计仅服务于"领取"场景,未考虑"审批"场景 | 新增接口:`GET /api/tasks/pending-review`(权限 REVIEWER),返回 `phase=EXTRACTION OR QA_GENERATION AND status=SUBMITTED` 的任务列表(分页);ADMIN 可查全部 | +| J | HIGH | §4.4 `ExtractionService.approve()` | **@Transactional 内同步调用 AI 服务**:`generateQa()` 在事务中发起 HTTP 请求,可能耗时 5–30 秒。将导致:① DB 连接被长期占用(连接池耗尽风险);② AI 调用失败会回滚已完成的审批动作(标注员和审批员的工作丢失);③ 审批接口响应时间不可预期。 | 审批与 QA 生成未解耦 | **两阶段拆分**:`approve()` 事务只完成步骤 1–3(is_final、APPROVED、history),然后发布 `ExtractionApprovedEvent`;`QaGenerationListener` 异步消费该事件,完成步骤 4–7(AI 调用、生成 QA、创建 QA 任务、更新 source_data)。若 AI 调用失败,审批结果不回滚,QA 任务可重试 | +| K | HIGH | §4.3、§4.4、§4.5 | **REJECTED 任务重拾路径未实现**:驳回后 task.status = REJECTED,标注员需"重新 claim",但:① `GET /api/tasks/pool` 仅展示 UNCLAIMED 任务,REJECTED 任务不可见;② 没有 `POST /api/tasks/{id}/reclaim` 接口;③ 状态机定义了 `REJECTED → IN_PROGRESS` 合法转换,但无 API 触发它。被驳回的标注员无任何操作路径。 | 驳回后续路径仅在状态机中声明,未转化为接口 | 方案一:修改 `GET /api/tasks/mine` 包含 REJECTED 状态,新增 `POST /api/tasks/{id}/reclaim` 接口(权限 ANNOTATOR,状态机校验 REJECTED → IN_PROGRESS,设置 status=IN_PROGRESS)。方案二:驳回时直接将 task 置为 UNCLAIMED,退回公共任务池(原领取人或他人均可重新领取)。**推荐方案一**(保留原领取人优先权,避免任务被他人抢占) | +| L | MEDIUM | §4.5 `QaService.approve()` | **重复变量声明(编译错误)**:方法内 `task` 变量被声明两次(第 1057 行与第 1065 行),Java 不允许同作用域内重复声明,此代码无法编译通过。 | 复制粘贴失误 | 删除第 1065 行的 `AnnotationTask task =`,直接使用第 1057 行已声明的 `task` 变量 | +| M | MEDIUM | §5.2 `SourceStatus` 状态机 | **source_data.REJECTED 状态不可达**:状态机声明 `QA_REVIEW → REJECTED` 合法,但 `QaService.reject()` 明确注释"source_data 保持 QA_REVIEW",从不触发此转换。REJECTED 是死状态,状态机与实现不一致,可能误导实现者。 | 状态机定义未与实现对齐 | 从 `SourceStatus.TRANSITIONS` 中移除 `QA_REVIEW → REJECTED` 转换,并在注释中说明:source_data 不会进入 REJECTED;QA 被驳回时 source_data 保持 QA_REVIEW,仅 annotation_task 进入 REJECTED | + +#### 9.4.2 审批工作流完整路径(修复后期望状态) + +``` +EXTRACTION 阶段: + ANNOTATOR 领取(/api/tasks/{id}/claim) + → 标注工作台提交(/api/extraction/{taskId}/submit) + → REVIEWER 从审批收件箱发现(/api/tasks/pending-review) + → 审批通过(/api/extraction/{taskId}/approve) + → [异步] AI 生成 QA → 创建 QA_GENERATION 任务 → source_data → QA_REVIEW + → 审批驳回(/api/extraction/{taskId}/reject) + → 标注员从"我的任务"看到 REJECTED 任务 + → 重拾(/api/tasks/{id}/reclaim)→ IN_PROGRESS → 修改后重提 + +QA_GENERATION 阶段: + ANNOTATOR/REVIEWER 领取(/api/tasks/{id}/claim) + → 问答对修改提交(/api/qa/{taskId}/submit) + → REVIEWER 从审批收件箱发现(/api/tasks/pending-review) + → 审批通过(/api/qa/{taskId}/approve) + → training_dataset → APPROVED → source_data → APPROVED(流水线终态) + → 审批驳回(/api/qa/{taskId}/reject) + → training_dataset → REJECTED;source_data 保持 QA_REVIEW + → 标注员重拾(/api/tasks/{id}/reclaim)→ 修改后重提 +``` + +#### 9.4.3 需立即修复的条目 + +| 问题 | 修复位置 | 修复类型 | +|------|----------|----------| +| I(无审批收件箱) | §4.3 接口清单 | 新增接口 `GET /api/tasks/pending-review` | +| J(@Transactional 内 AI 调用) | §4.4 ExtractionService.approve() | 拆分为两阶段,步骤 4-7 异步化 | +| K(REJECTED 重拾路径缺失) | §4.3 接口清单 + §4.4/4.5 说明 | 新增接口 `POST /api/tasks/{id}/reclaim`;`GET /api/tasks/mine` 包含 REJECTED | +| L(重复变量声明) | §4.5 QaService.approve() 代码 | 删除第二个 `AnnotationTask task =` 声明 | +| M(source_data 死状态) | §5.2 SourceStatus 状态机 | 移除 `QA_REVIEW → REJECTED` 转换 |