# label_backend 开发实施指南 **版本:1.0.0 | 日期:2026-04-09 | 依据:backend后台设计.md v2.2 + constitution.md v1.1.0** --- ## 一、项目总览 ### 1.1 系统定位 label_backend 是知识图谱智能标注平台的后端服务,基于 Spring Boot 3 构建,驱动**文本线**和**图片线**两条标注流水线,将文档、图片、视频原始资料处理为 GLM 微调格式的训练数据集。 ### 1.2 数据流水线 ``` 文本线: 文档 → 三元组提取(主语/谓语/宾语 + 原文片段 + 字符偏移) → 问答对生成 → 审批 → 训练样本(GLM 文本格式) 图片线: 图片 → 四元组提取(主体/关系/客体/修饰词 + bbox + 裁剪图) → 问答对生成 → 审批 → 训练样本(GLM 图文格式) 视频预处理(非独立流水线): 帧模式: 视频 → AI 抽帧 → 每帧作为图片进入图片线 片段模式: 视频 → 多模态模型转文本 → 派生 TEXT source_data → 进入文本线 ``` ### 1.3 技术栈矩阵 | 层次 | 技术 | 版本约束 | |------|------|----------| | 运行时 | JDK | 17(LTS) | | 框架 | Spring Boot | ≥ 3.0.x | | 认证/鉴权 | Apache Shiro | ≥ 1.13.x(兼容 Spring Boot 3) | | ORM | MyBatis Plus | ≥ 3.5.x | | 数据库 | PostgreSQL | ≥ 14 | | 缓存/锁 | Redis | ≥ 6.x | | 对象存储 | RustFS(S3 兼容接口) | 当前稳定版 | | AI 服务 | Python FastAPI(HTTP 调用) | JVM 侧仅作 RestClient 调用 | | 容器化 | Docker Compose | ≥ 2.x | | 构建工具 | Maven | ≥ 3.8 | ### 1.4 后端模块清单 | 模块 | 职责 | |------|------| | 用户与权限模块 | 用户管理、Shiro RBAC、Token 认证 | | 资料管理模块 | 文件上传至 RustFS、source_data 元数据管理 | | 任务池模块 | 任务创建、领取(分布式锁 + 乐观锁)、状态流转 | | 标注工作台模块 | 调用 AI 提取三/四元组、annotation_result JSONB 写入 | | 问答生成模块 | 调用 AI 生成候选问答对、training_dataset 管理 | | 训练数据导出模块 | JSONL 批次导出、GLM 微调对接 | | 系统配置模块 | Prompt 模板、模型参数、全局/公司级配置管理 | | 视频处理模块 | 异步抽帧 / 转文本任务跟踪、幂等回调处理 | ### 1.5 包结构 ``` com.label ├── LabelBackendApplication.java ├── common/ │ ├── result/ # Result、ResultCode、PageResult │ ├── exception/ # BusinessException、GlobalExceptionHandler │ ├── context/ # CompanyContext(ThreadLocal) │ ├── shiro/ # TokenFilter、UserRealm、ShiroConfig │ ├── redis/ # RedisKeyManager、RedisService │ ├── aop/ # AuditAspect、@OperationLog 注解 │ ├── storage/ # RustFsClient(S3 兼容封装) │ ├── ai/ # AiServiceClient(RestClient 封装 8 个端点) │ └── statemachine/ # StateValidator、各状态枚举 ├── module/ │ ├── user/ │ │ ├── controller/ # AuthController、UserController │ │ ├── service/ # AuthService、UserService │ │ ├── mapper/ # SysUserMapper、SysCompanyMapper │ │ ├── entity/ # SysUser、SysCompany │ │ └── dto/ # LoginRequest、UserDTO、UserCreateRequest │ ├── source/ │ │ ├── controller/ # SourceController │ │ ├── service/ # SourceService │ │ ├── mapper/ # SourceDataMapper │ │ ├── entity/ # SourceData │ │ └── dto/ # SourceUploadResponse、SourceListQuery │ ├── task/ │ │ ├── controller/ # TaskController │ │ ├── service/ # TaskService、TaskClaimService │ │ ├── mapper/ # AnnotationTaskMapper、TaskHistoryMapper │ │ ├── entity/ # AnnotationTask、AnnotationTaskHistory │ │ └── dto/ # TaskCreateRequest、TaskPoolQuery、TaskDetailDTO │ ├── annotation/ │ │ ├── controller/ # ExtractionController、QaController │ │ ├── service/ # ExtractionService、QaService │ │ ├── mapper/ # AnnotationResultMapper、TrainingDatasetMapper │ │ ├── entity/ # AnnotationResult、TrainingDataset │ │ └── dto/ # ExtractionResultDTO、QaItemDTO、RejectRequest │ ├── export/ │ │ ├── controller/ # ExportController │ │ ├── service/ # ExportService、FinetuneService │ │ ├── mapper/ # ExportBatchMapper │ │ ├── entity/ # ExportBatch │ │ └── dto/ # ExportBatchRequest、FinetuneRequest │ ├── config/ │ │ ├── controller/ # SysConfigController │ │ ├── service/ # SysConfigService │ │ ├── mapper/ # SysConfigMapper │ │ ├── entity/ # SysConfig │ │ └── dto/ # ConfigUpdateRequest │ └── video/ │ ├── controller/ # VideoController │ ├── service/ # VideoProcessService │ ├── mapper/ # VideoProcessJobMapper │ ├── entity/ # VideoProcessJob │ └── dto/ # VideoProcessRequest、VideoCallbackRequest ``` --- ## 二、数据库 DDL > 执行顺序:sys_company → sys_user → source_data → annotation_task → annotation_result → training_dataset → export_batch → sys_config → sys_operation_log → annotation_task_history → video_process_job ```sql -- ============================================= -- 1. sys_company — 公司表(多租户根节点) -- ============================================= CREATE TABLE sys_company ( id BIGSERIAL PRIMARY KEY, company_name VARCHAR(100) NOT NULL UNIQUE, company_code VARCHAR(50) NOT NULL UNIQUE, status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE', -- ACTIVE / DISABLED created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); -- ============================================= -- 2. sys_user — 用户表 -- ============================================= CREATE TABLE sys_user ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), username VARCHAR(50) NOT NULL, password_hash VARCHAR(255) NOT NULL, -- BCrypt,强度因子 >= 10 real_name VARCHAR(50), role VARCHAR(20) NOT NULL, -- UPLOADER / ANNOTATOR / REVIEWER / ADMIN status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE', created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW(), CONSTRAINT uq_company_username UNIQUE (company_id, username) ); CREATE INDEX idx_sys_user_company ON sys_user(company_id); -- ============================================= -- 3. source_data — 原始资料元数据表 -- ============================================= CREATE TABLE source_data ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), uploader_id BIGINT REFERENCES sys_user(id), data_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO file_path VARCHAR(500) NOT NULL, file_name VARCHAR(255) NOT NULL, file_size BIGINT, bucket_name VARCHAR(100) NOT NULL, parent_source_id BIGINT REFERENCES source_data(id), -- 视频转文本时指向原视频 status VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED / REJECTED reject_reason TEXT, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_source_data_company ON source_data(company_id); CREATE INDEX idx_source_data_status ON source_data(company_id, status); CREATE INDEX idx_source_data_parent ON source_data(parent_source_id); -- ============================================= -- 4. annotation_task — 标注任务表 -- ============================================= CREATE TABLE annotation_task ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), source_id BIGINT NOT NULL REFERENCES source_data(id), phase VARCHAR(20) NOT NULL, -- EXTRACTION / QA_GENERATION task_type VARCHAR(20) NOT NULL, -- AI_ASSISTED / MANUAL ai_model VARCHAR(50), video_unit_type VARCHAR(20), -- FRAME(仅视频帧模式填写,其余为空) video_unit_info JSONB, -- {"frame_index":150,"time_sec":5.0,"frame_path":"..."} claimed_by BIGINT REFERENCES sys_user(id), claimed_at TIMESTAMP, status VARCHAR(20) NOT NULL DEFAULT 'UNCLAIMED', -- UNCLAIMED / IN_PROGRESS / SUBMITTED / APPROVED / REJECTED reject_reason TEXT, submitted_at TIMESTAMP, completed_at TIMESTAMP, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_annotation_task_company ON annotation_task(company_id); CREATE INDEX idx_annotation_task_pool ON annotation_task(company_id, phase, status); CREATE INDEX idx_annotation_task_claimed ON annotation_task(claimed_by, status); -- ============================================= -- 5. annotation_result — 标注结果表(EXTRACTION 阶段,JSONB 聚合) -- ============================================= CREATE TABLE annotation_result ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), task_id BIGINT NOT NULL REFERENCES annotation_task(id), result_json JSONB NOT NULL, -- 整体覆盖,禁止局部 PATCH is_final BOOLEAN NOT NULL DEFAULT FALSE, submitted_by BIGINT REFERENCES sys_user(id), created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_annotation_result_task ON annotation_result(task_id); CREATE INDEX idx_annotation_result_final ON annotation_result(company_id, is_final); -- ============================================= -- 6. training_dataset — 训练样本表(问答对终态) -- ============================================= CREATE TABLE training_dataset ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), task_id BIGINT NOT NULL REFERENCES annotation_task(id), source_id BIGINT NOT NULL REFERENCES source_data(id), extraction_result_id BIGINT NOT NULL REFERENCES annotation_result(id), sample_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO_FRAME glm_format_json JSONB NOT NULL, -- GLM 微调格式,JSONB 支持字段级查询 export_batch_id VARCHAR(50), -- 未导出时为空 status VARCHAR(20) NOT NULL DEFAULT 'PENDING_REVIEW', -- PENDING_REVIEW / APPROVED / REJECTED reject_reason TEXT, reviewed_by BIGINT REFERENCES sys_user(id), exported_at TIMESTAMP, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_training_dataset_company ON training_dataset(company_id); CREATE INDEX idx_training_dataset_status ON training_dataset(company_id, status); CREATE INDEX idx_training_dataset_batch ON training_dataset(export_batch_id); -- ============================================= -- 7. export_batch — 导出批次表 -- ============================================= CREATE TABLE export_batch ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), batch_uuid VARCHAR(50) NOT NULL UNIQUE, dataset_file_path VARCHAR(500), sample_count INT NOT NULL DEFAULT 0, glm_job_id VARCHAR(100), -- 调用微调接口后填写,初始为 NULL finetune_status VARCHAR(20) NOT NULL DEFAULT 'NOT_STARTED', -- NOT_STARTED / RUNNING / SUCCESS / FAILED error_message TEXT, created_by BIGINT REFERENCES sys_user(id), created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_export_batch_company ON export_batch(company_id); -- ============================================= -- 8. sys_config — 系统配置表(支持全局默认 + 公司级覆盖) -- ============================================= CREATE TABLE sys_config ( id BIGSERIAL PRIMARY KEY, company_id BIGINT REFERENCES sys_company(id), -- NULL = 全局默认配置 config_key VARCHAR(100) NOT NULL, config_value TEXT NOT NULL, description TEXT, updated_by BIGINT REFERENCES sys_user(id), updated_at TIMESTAMP NOT NULL DEFAULT NOW(), CONSTRAINT uq_config_company_key UNIQUE (company_id, config_key) ); -- 预置全局配置项 INSERT INTO sys_config (company_id, config_key, config_value, description) VALUES (NULL, 'prompt_extract_text', '...', '文本三元组提取 Prompt 模板'), (NULL, 'prompt_extract_image', '...', '图像四元组提取 Prompt 模板'), (NULL, 'prompt_video_to_text', '...', '视频转文本 Prompt 模板'), (NULL, 'prompt_qa_gen_text', '...', '文本问答对生成 Prompt 模板'), (NULL, 'prompt_qa_gen_image', '...', '图像问答对生成 Prompt 模板'), (NULL, 'model_default', 'glm-4', '默认 AI 辅助模型'), (NULL, 'video_frame_interval', '30', '视频帧模式抽帧间隔(帧数)'), (NULL, 'token_ttl_seconds', '7200', 'Token 有效期(秒)'), (NULL, 'glm_api_base_url', 'http://ai-service:8000', 'GLM API 地址'); -- ============================================= -- 9. sys_operation_log — 操作审计日志(只追加,按月分区) -- ============================================= CREATE TABLE sys_operation_log ( id BIGSERIAL PRIMARY KEY, company_id BIGINT REFERENCES sys_company(id), operator_id BIGINT REFERENCES sys_user(id), -- 登录失败时可为 NULL operator_name VARCHAR(50) NOT NULL, -- 操作时用户名快照 operation_type VARCHAR(50) NOT NULL, target_type VARCHAR(30), target_id BIGINT, detail JSONB, ip_address VARCHAR(50), result VARCHAR(10) NOT NULL, -- SUCCESS / FAIL error_message TEXT, created_at TIMESTAMP NOT NULL DEFAULT NOW() ) PARTITION BY RANGE (created_at); -- 初始分区(建议使用 pg_partman 自动维护) CREATE TABLE sys_operation_log_2026_04 PARTITION OF sys_operation_log FOR VALUES FROM ('2026-04-01') TO ('2026-05-01'); CREATE TABLE sys_operation_log_2026_05 PARTITION OF sys_operation_log FOR VALUES FROM ('2026-05-01') TO ('2026-06-01'); -- ============================================= -- 10. annotation_task_history — 任务流转历史(只追加) -- ============================================= CREATE TABLE annotation_task_history ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), task_id BIGINT NOT NULL REFERENCES annotation_task(id), from_status VARCHAR(20), -- 任务初建时为 NULL to_status VARCHAR(20) NOT NULL, operator_id BIGINT NOT NULL REFERENCES sys_user(id), operator_role VARCHAR(20) NOT NULL, -- 操作时角色快照 note TEXT, -- 驳回原因、强制转移说明等 created_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_task_history_task ON annotation_task_history(task_id); -- ============================================= -- 11. video_process_job — 视频异步处理任务表 -- ============================================= CREATE TABLE video_process_job ( id BIGSERIAL PRIMARY KEY, company_id BIGINT NOT NULL REFERENCES sys_company(id), source_id BIGINT NOT NULL REFERENCES source_data(id), job_type VARCHAR(20) NOT NULL, -- FRAME_EXTRACT / VIDEO_TO_TEXT status VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING / RUNNING / SUCCESS / FAILED / RETRYING params JSONB NOT NULL, total_units INT, processed_units INT NOT NULL DEFAULT 0, output_path VARCHAR(500), retry_count INT NOT NULL DEFAULT 0, max_retries INT NOT NULL DEFAULT 3, error_message TEXT, started_at TIMESTAMP, completed_at TIMESTAMP, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_video_process_job_source ON video_process_job(source_id); CREATE INDEX idx_video_process_job_status ON video_process_job(status); ``` --- ## 三、公共基础设施 ### 3.1 统一响应封装 ```java // com/label/common/result/Result.java public record Result(String code, T data, String message) { public static Result ok(T data) { return new Result<>("SUCCESS", data, null); } public static Result ok() { return new Result<>("SUCCESS", null, null); } public static Result fail(String code, String message) { return new Result<>(code, null, message); } } // com/label/common/result/PageResult.java public record PageResult(List items, long total, int page, int pageSize) {} ``` 所有 Controller 返回 `Result`,禁止直接返回裸 POJO 或裸 List。分页接口返回 `Result>`。 ### 3.2 全局异常处理 ```java // com/label/common/exception/GlobalExceptionHandler.java @RestControllerAdvice public class GlobalExceptionHandler { @ExceptionHandler(BusinessException.class) public ResponseEntity> handleBusiness(BusinessException ex) { return ResponseEntity.status(ex.getHttpStatus()) .body(Result.fail(ex.getCode(), ex.getMessage())); } @ExceptionHandler(UnauthorizedException.class) public ResponseEntity> handleUnauthorized() { return ResponseEntity.status(403) .body(Result.fail("FORBIDDEN", "权限不足")); } @ExceptionHandler(Exception.class) public ResponseEntity> handleUnexpected(Exception ex, HttpServletRequest req) { log.error("Unexpected error on {}", req.getRequestURI(), ex); // 生产响应禁止包含堆栈跟踪 return ResponseEntity.status(500) .body(Result.fail("INTERNAL_ERROR", "服务器内部错误")); } } ``` ### 3.3 多租户 CompanyContext(ThreadLocal) ```java // com/label/common/context/CompanyContext.java public final class CompanyContext { private static final ThreadLocal COMPANY_ID = new ThreadLocal<>(); public static void set(Long companyId) { COMPANY_ID.set(companyId); } public static Long get() { return COMPANY_ID.get(); } public static void clear() { COMPANY_ID.remove(); } // 必须在 finally 块调用 } ``` 在 Shiro `TokenFilter` 的 `executeLogin` 成功后注入;在过滤器的 `finally` 块调用 `CompanyContext.clear()`,防止线程池复用时数据串漏至其他租户。 **禁止**调用方通过请求体传入 `company_id` 作为参数。所有 Service 通过 `CompanyContext.get()` 获取。 MyBatis Plus 配置 `TenantLineInnerInterceptor` 自动向所有 Mapper 查询注入 `company_id = ?` 条件: ```java @Bean public MybatisPlusInterceptor mybatisPlusInterceptor() { MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new TenantLineInnerInterceptor( new TenantLineHandler() { public Expression getTenantId() { return new LongValue(CompanyContext.get()); } public String getTenantIdColumn() { return "company_id"; } } )); return interceptor; } ``` ### 3.4 Shiro 配置 #### TokenFilter(继承 AuthenticatingFilter) ```java // com/label/common/shiro/TokenFilter.java public class TokenFilter extends AuthenticatingFilter { @Override protected AuthenticationToken createToken(ServletRequest req, ServletResponse res) { String token = extractToken((HttpServletRequest) req); return new TokenAuthenticationToken(token); } @Override protected boolean onAccessDenied(ServletRequest req, ServletResponse res) throws Exception { String token = extractToken((HttpServletRequest) req); if (token == null) { sendUnauthorized(res, "缺少 Token"); return false; } return executeLogin(req, res); } @Override protected boolean onLoginSuccess(AuthenticationToken token, Subject subject, ServletRequest req, ServletResponse res) throws Exception { UserSession session = (UserSession) subject.getPrincipal(); // 滑动过期:每次有效请求重置 TTL long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L); redisTemplate.expire(RedisKeyManager.tokenKey(session.getToken()), ttl, SECONDS); // 注入租户上下文(在整个 Filter 链执行期间有效,由 doFilterInternal 的 finally 清理) CompanyContext.set(session.getCompanyId()); return true; // 继续执行 Filter 链(进入 Controller) } // ⚠️ 必须在此处清理 ThreadLocal,而非 onLoginSuccess—— // onLoginSuccess 返回 true 后 Filter 链才继续执行,此时 Controller 尚未运行。 // doFilterInternal 的 finally 块保证请求结束后(无论正常还是异常)都清理上下文。 @Override protected void doFilterInternal(ServletRequest req, ServletResponse res, FilterChain chain) throws ServletException, IOException { try { super.doFilterInternal(req, res, chain); } finally { CompanyContext.clear(); } } private String extractToken(HttpServletRequest req) { String header = req.getHeader("Authorization"); if (header != null && header.startsWith("Bearer ")) { return header.substring(7); } return null; } } ``` #### UserRealm(继承 AuthorizingRealm) ```java // com/label/common/shiro/UserRealm.java public class UserRealm extends AuthorizingRealm { // 认证:从 Redis 验证 token:{uuid} 是否存在 @Override protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) { String uuid = ((TokenAuthenticationToken) token).getToken(); Map data = redisTemplate.opsForHash() .entries(RedisKeyManager.tokenKey(uuid)); if (data.isEmpty()) throw new UnknownAccountException("Token 无效或已过期"); UserSession session = UserSession.from(uuid, data); return new SimpleAuthenticationInfo(session, uuid, getName()); } // 鉴权:先查 Redis user:perm:{userId},未命中查 PostgreSQL @Override protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) { UserSession session = (UserSession) principals.getPrimaryPrincipal(); String permKey = RedisKeyManager.userPermKey(session.getUserId()); String cachedRole = (String) redisTemplate.opsForValue().get(permKey); if (cachedRole == null) { SysUser user = sysUserMapper.selectById(session.getUserId()); cachedRole = user.getRole(); redisTemplate.opsForValue().set(permKey, cachedRole, 5, MINUTES); } SimpleAuthorizationInfo info = new SimpleAuthorizationInfo(); info.addRole(cachedRole); // 角色继承:高级角色包含所有低级角色权限 addInheritedRoles(info, cachedRole); return info; } private void addInheritedRoles(SimpleAuthorizationInfo info, String role) { // ADMIN ⊃ REVIEWER ⊃ ANNOTATOR ⊃ UPLOADER switch (role) { case "ADMIN" -> info.addRoles(Set.of("REVIEWER", "ANNOTATOR", "UPLOADER")); case "REVIEWER" -> info.addRoles(Set.of("ANNOTATOR", "UPLOADER")); case "ANNOTATOR" -> info.addRoles(Set.of("UPLOADER")); } } } ``` #### ShiroConfig 过滤器链 ```java @Bean public ShiroFilterChainDefinition shiroFilterChainDefinition() { DefaultShiroFilterChainDefinition chain = new DefaultShiroFilterChainDefinition(); chain.addPathDefinition("/api/auth/login", "anon"); chain.addPathDefinition("/api/**", "tokenFilter"); return chain; } ``` 权限声明使用注解,禁止在 Service 内使用 if-role 临时判断: ```java @RequiresRoles("ADMIN") public void createTask(...) @RequiresRoles(value = {"ANNOTATOR", "REVIEWER", "ADMIN"}, logical = Logical.OR) public void claimTask(...) ``` #### 角色变更立即驱逐缓存 ```java // UserService.updateRole() 内 @Transactional public void updateRole(Long userId, String newRole) { userMapper.updateRole(userId, newRole); // 立即驱逐,不等 TTL 自然过期(延迟在禁用高权限账号时存在安全窗口) redisTemplate.delete(RedisKeyManager.userPermKey(userId)); } ``` ### 3.5 Redis Key 管理 ```java // com/label/common/redis/RedisKeyManager.java public final class RedisKeyManager { public static String tokenKey(String uuid) { return "token:" + uuid; } public static String userPermKey(Long userId) { return "user:perm:" + userId; } public static String taskClaimKey(Long taskId){ return "task:claim:" + taskId; } } ``` 禁止在上述三类命名空间之外自造 Key 用于认证、权限或锁目的。 ### 3.6 AOP 审计日志切面 ```java // com/label/common/aop/OperationLog.java(注解) @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface OperationLog { String type(); // 对应 operation_type 枚举值,如 "TASK_CLAIM" String targetType() default ""; } // com/label/common/aop/AuditAspect.java @Aspect @Component public class AuditAspect { @Around("@annotation(operationLog)") public Object audit(ProceedingJoinPoint pjp, OperationLog operationLog) throws Throwable { String result = "SUCCESS"; String errorMsg = null; Object returnVal = null; try { returnVal = pjp.proceed(); } catch (Throwable ex) { result = "FAIL"; errorMsg = ex.getMessage(); throw ex; } finally { // 业务事务已 commit/rollback 后,以独立操作写入审计记录 // 审计写入失败只记录 error 日志,禁止回滚业务事务 try { saveAuditLog(operationLog.type(), operationLog.targetType(), result, errorMsg); } catch (Exception auditEx) { log.error("审计日志写入失败: type={}", operationLog.type(), auditEx); } } return returnVal; } private void saveAuditLog(String type, String targetType, String result, String errorMsg) { UserSession session = getCurrentSession(); SysOperationLog log = SysOperationLog.builder() .companyId(CompanyContext.get()) .operatorId(session != null ? session.getUserId() : null) .operatorName(session != null ? session.getUsername() : "unknown") .operationType(type) .targetType(targetType.isEmpty() ? null : targetType) .result(result) .errorMessage(errorMsg) .ipAddress(getClientIp()) .build(); operationLogMapper.insert(log); } } ``` ### 3.7 RustFS S3 客户端 RustFS 实现 S3 兼容接口,使用 AWS SDK for Java v2 连接: ```java // com/label/common/storage/RustFsClient.java @Component public class RustFsClient { private final S3Client s3Client; // 配置 endpoint 指向 RustFS 地址 public String upload(String bucket, String key, InputStream data, long size) { PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).build(); s3Client.putObject(req, RequestBody.fromInputStream(data, size)); return key; } public InputStream download(String bucket, String key) { GetObjectRequest req = GetObjectRequest.builder().bucket(bucket).key(key).build(); return s3Client.getObject(req); } public void delete(String bucket, String key) { s3Client.deleteObject(b -> b.bucket(bucket).key(key)); } public String getPresignedUrl(String bucket, String key, Duration expiry) { S3Presigner presigner = S3Presigner.builder().endpointOverride(endpointUri).build(); return presigner.presignGetObject(b -> b.signatureDuration(expiry) .getObjectRequest(r -> r.bucket(bucket).key(key))).url().toString(); } } ``` **对象存储路径规范:** | 资源类型 | 存储桶 | 路径格式 | |----------|--------|----------| | 上传文本文件 | `source-data` | `text/{yyyyMM}/{source_id}.txt` | | 上传图片 | `source-data` | `image/{yyyyMM}/{source_id}.jpg` | | 上传视频 | `source-data` | `video/{yyyyMM}/{source_id}.mp4` | | 视频帧图 | `source-data` | `frames/{source_id}/{frame_index}.jpg` | | 视频片段转译文本 | `source-data` | `video-text/{parent_source_id}/{timestamp}.txt` | | bbox 裁剪图 | `source-data` | `crops/{task_id}/{item_index}.jpg` | | 导出 JSONL | `finetune-export` | `export/{batchUuid}.jsonl` | ### 3.8 AI 服务 HTTP 客户端 Spring Boot 3 使用 `RestClient` 封装对 Python FastAPI 服务的同步 HTTP 调用: ```java // com/label/common/ai/AiServiceClient.java @Component public class AiServiceClient { private final RestClient restClient; public AiServiceClient(@Value("${ai.service.base-url}") String baseUrl) { this.restClient = RestClient.builder().baseUrl(baseUrl).build(); } // POST /api/v1/text/extract public TextExtractResponse extractText(String filePath, String model) { return restClient.post().uri("/api/v1/text/extract") .body(Map.of("file_path", filePath, "model", model)) .retrieve().body(TextExtractResponse.class); } // POST /api/v1/image/extract public ImageExtractResponse extractImage(String imagePath, String model) { return restClient.post().uri("/api/v1/image/extract") .body(Map.of("image_path", imagePath, "model", model)) .retrieve().body(ImageExtractResponse.class); } // POST /api/v1/video/extract-frames public VideoFramesResponse extractFrames(String videoPath, String frameInterval, String mode) { return restClient.post().uri("/api/v1/video/extract-frames") .body(Map.of("video_path", videoPath, "frame_interval", frameInterval, "mode", mode)) .retrieve().body(VideoFramesResponse.class); } // POST /api/v1/video/to-text public VideoToTextResponse videoToText(String videoPath, int startSec, int endSec, String model) { return restClient.post().uri("/api/v1/video/to-text") .body(Map.of("video_path", videoPath, "start_sec", startSec, "end_sec", endSec, "model", model)) .retrieve().body(VideoToTextResponse.class); } // POST /api/v1/qa/gen-text public QaGenResponse genTextQa(List triples, String model, String promptTemplate) { return restClient.post().uri("/api/v1/qa/gen-text") .body(Map.of("items", triples, "model", model, "prompt_template", promptTemplate)) .retrieve().body(QaGenResponse.class); } // POST /api/v1/qa/gen-image public QaGenResponse genImageQa(List quadruples, String model, String promptTemplate) { return restClient.post().uri("/api/v1/qa/gen-image") .body(Map.of("items", quadruples, "model", model, "prompt_template", promptTemplate)) .retrieve().body(QaGenResponse.class); } // POST /api/v1/finetune/start public FinetuneStartResponse startFinetune(String jsonlUrl, String baseModel, Map params) { return restClient.post().uri("/api/v1/finetune/start") .body(Map.of("jsonl_url", jsonlUrl, "base_model", baseModel, "params", params)) .retrieve().body(FinetuneStartResponse.class); } // GET /api/v1/finetune/status/{jobId} public FinetuneStatusResponse getFinetuneStatus(String jobId) { return restClient.get().uri("/api/v1/finetune/status/{jobId}", jobId) .retrieve().body(FinetuneStatusResponse.class); } } ``` --- ## 四、业务模块纵切 ### 4.1 用户与权限模块 **Entity(SysUser)**:字段同 DDL,`passwordHash` 字段加 `@JsonIgnore` 禁止序列化到响应。 **AuthService 核心逻辑:** ```java // 登录 @OperationLog(type = "USER_LOGIN") public String login(Long companyId, String username, String password, String ip) { SysUser user = userMapper.selectByCompanyAndUsername(companyId, username); if (user == null || !BCrypt.checkpw(password, user.getPasswordHash())) throw new BusinessException("USER_NOT_FOUND", "用户名或密码错误"); if ("DISABLED".equals(user.getStatus())) throw new BusinessException("USER_DISABLED", "账号已禁用"); String token = UUID.randomUUID().toString(); Map tokenData = Map.of( "userId", user.getId(), "role", user.getRole(), "companyId", user.getCompanyId(), "username", user.getUsername() ); long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L); redisTemplate.opsForHash().putAll(RedisKeyManager.tokenKey(token), tokenData); redisTemplate.expire(RedisKeyManager.tokenKey(token), ttl, SECONDS); return token; } // 退出登录 @OperationLog(type = "USER_LOGOUT") public void logout(String token) { redisTemplate.delete(RedisKeyManager.tokenKey(token)); } ``` **接口清单:** | 方法 | 路径 | 最低权限 | 说明 | |------|------|----------|------| | POST | `/api/auth/login` | 匿名 | 返回 Token(UUID) | | POST | `/api/auth/logout` | 已登录 | 删除 Redis Token | | GET | `/api/auth/me` | 已登录 | 当前用户信息与角色 | | GET | `/api/users` | ADMIN | 分页查询用户列表 | | POST | `/api/users` | ADMIN | 创建用户 | | PUT | `/api/users/{id}` | ADMIN | 更新用户信息 | | PUT | `/api/users/{id}/status` | ADMIN | 启用/禁用账号(同步驱逐权限缓存) | | PUT | `/api/users/{id}/role` | ADMIN | 变更角色(同步驱逐权限缓存) | --- ### 4.2 资料管理模块 **SourceService 核心逻辑:** ```java @OperationLog(type = "SOURCE_UPLOAD") @Transactional public SourceData upload(MultipartFile file, String dataType) { Long companyId = CompanyContext.get(); // 1. 先创建 source_data 记录获取 ID(用于构造存储路径) SourceData sd = SourceData.builder() .companyId(companyId).uploaderId(getCurrentUserId()) .dataType(dataType).fileName(file.getOriginalFilename()) .fileSize(file.getSize()).bucketName("source-data") .status("PENDING").build(); sourceDataMapper.insert(sd); // 2. 构造路径并上传至 RustFS String path = buildPath(dataType, sd.getId()); rustFsClient.upload("source-data", path, file.getInputStream(), file.getSize()); // 3. 更新 file_path sd.setFilePath(path); sourceDataMapper.updateById(sd); return sd; } private String buildPath(String dataType, Long sourceId) { String ym = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM")); return switch (dataType) { case "TEXT" -> "text/" + ym + "/" + sourceId + ".txt"; case "IMAGE" -> "image/" + ym + "/" + sourceId + ".jpg"; case "VIDEO" -> "video/" + ym + "/" + sourceId + ".mp4"; default -> throw new BusinessException("INVALID_TYPE", "不支持的资料类型"); }; } ``` **接口清单:** | 方法 | 路径 | 最低权限 | 说明 | |------|------|----------|------| | POST | `/api/source/upload` | UPLOADER | 上传文件,创建 source_data 记录 | | GET | `/api/source/list` | UPLOADER | 分页查询(UPLOADER 只见自己,ADMIN 见全部) | | GET | `/api/source/{id}` | UPLOADER | 查看资料详情 | | DELETE | `/api/source/{id}` | ADMIN | 删除资料及 RustFS 文件 | --- ### 4.3 任务管理模块(含并发控制) **TaskClaimService.claim() — 双重保障:** ```java @Transactional @OperationLog(type = "TASK_CLAIM") public void claim(Long taskId) { Long userId = getCurrentUserId(); // 第一重:Redis 分布式锁(TTL 30s,SET NX) Boolean locked = redisTemplate.opsForValue() .setIfAbsent(RedisKeyManager.taskClaimKey(taskId), userId.toString(), 30, SECONDS); if (!Boolean.TRUE.equals(locked)) throw new BusinessException("TASK_CLAIMED", "任务已被他人领取"); // 第二重:数据库乐观约束(WHERE status = 'UNCLAIMED') // UPDATE annotation_task SET status='IN_PROGRESS', claimed_by=?, claimed_at=NOW() // WHERE id=? AND status='UNCLAIMED' AND company_id=? int affected = taskMapper.claimTask(taskId, userId, CompanyContext.get()); if (affected == 0) throw new BusinessException("TASK_CLAIMED", "任务已被他人领取"); // 写入任务历史(同一事务) insertHistory(taskId, "UNCLAIMED", "IN_PROGRESS", userId, null); } public void unclaim(Long taskId) { StateValidator.assertTransition(TaskStatus.IN_PROGRESS, TaskStatus.UNCLAIMED, TaskStatus.TRANSITIONS); // 更新 claimed_by = NULL, status = UNCLAIMED taskMapper.unclaim(taskId, getCurrentUserId(), CompanyContext.get()); redisTemplate.delete(RedisKeyManager.taskClaimKey(taskId)); insertHistory(taskId, "IN_PROGRESS", "UNCLAIMED", getCurrentUserId(), null); } ``` **每次 status 变更必须在同一事务中调用 `insertHistory()` 写入 `annotation_task_history`。** **接口清单:** | 方法 | 路径 | 最低权限 | 说明 | |------|------|----------|------| | POST | `/api/tasks` | ADMIN | 为指定 source 创建 EXTRACTION 任务 | | GET | `/api/tasks/pool` | ANNOTATOR | 查看可领取任务列表(按角色过滤,分页)。**角色过滤规则**:ANNOTATOR 返回 `phase=EXTRACTION AND status=UNCLAIMED`;REVIEWER 返回 `phase=EXTRACTION AND status=UNCLAIMED` ∪ `phase=QA_GENERATION AND status=UNCLAIMED`(REVIEWER 含 ANNOTATOR 继承权限,可领取两类任务);ADMIN 返回所有 phase、所有 status 的任务(走 `GET /api/tasks`)。 | | GET | `/api/tasks/pending-review` | REVIEWER | **审批收件箱**:返回 `status=SUBMITTED` 的任务列表(分页)。REVIEWER 看本公司全部待审批任务;ADMIN 同等权限。ANNOTATOR 无权访问此接口(403)。 | | POST | `/api/tasks/{id}/claim` | ANNOTATOR | 领取任务(争抢式) | | POST | `/api/tasks/{id}/unclaim` | ANNOTATOR | 放弃任务,退回任务池 | | POST | `/api/tasks/{id}/reclaim` | ANNOTATOR | **重拾被驳回的任务**:仅当 `task.status=REJECTED AND task.claimed_by=currentUserId` 时合法;将 status 置为 IN_PROGRESS(状态机 REJECTED → IN_PROGRESS);写入任务历史。 | | GET | `/api/tasks/mine` | ANNOTATOR | 查询我领取的任务列表(分页),**包含 REJECTED 状态**(让标注员发现被驳回的任务)。 | | GET | `/api/tasks/{id}` | ANNOTATOR | 查看任务详情 | | GET | `/api/tasks` | ADMIN | 查询全部任务(支持过滤,分页) | | PUT | `/api/tasks/{id}/reassign` | ADMIN | 强制转移任务归属(更新 claimed_by,task status 保持 IN_PROGRESS,向 annotation_task_history 写入 from=IN_PROGRESS / to=IN_PROGRESS + note=转移原因) | --- ### 4.4 标注工作台模块(EXTRACTION 阶段) **ExtractionService 核心逻辑:** ```java // AI 辅助预标注(标注员领取任务后调用) public AnnotationResult aiPreAnnotate(Long taskId) { AnnotationTask task = taskMapper.selectById(taskId); SourceData source = sourceDataMapper.selectById(task.getSourceId()); Object aiResult = "IMAGE".equals(source.getDataType()) || task.getVideoUnitType() != null ? aiServiceClient.extractImage(resolveImagePath(task), task.getAiModel()) : aiServiceClient.extractText(source.getFilePath(), task.getAiModel()); AnnotationResult result = buildAnnotationResult(task, aiResult); annotationResultMapper.insertOrReplace(result); return result; } // 更新提取结果(整体覆盖,PUT 语义) public void updateResult(Long taskId, String resultJsonStr) { // 验证 JSON 格式合法性 validateResultJson(resultJsonStr); // 整体替换 result_json,禁止局部 PATCH 或逐条追加 annotationResultMapper.updateResultJson(taskId, resultJsonStr, CompanyContext.get()); } // 审批通过——两阶段设计: // 阶段一(同步 @Transactional):完成审批核心动作(步骤 1-3),立即返回 // 阶段二(异步事件):AI 调用 + QA 任务创建(步骤 4-7)由 ExtractionApprovedEvent 驱动 // ⚠️ 禁止将 AI HTTP 调用放入 @Transactional 内:会长期占用 DB 连接,且 AI 失败会回滚已完成的审批动作 // ⚠️ 自审防护:提交人(claimed_by)不能同时作为审批人 @Transactional @OperationLog(type = "EXTRACTION_APPROVE") public void approve(Long taskId) { AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); if (task.getClaimedBy().equals(getCurrentUserId())) throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务"); AnnotationResult result = annotationResultMapper.selectByTaskId(taskId); // 1. annotation_result.is_final = true result.setIsFinal(true); annotationResultMapper.updateById(result); // 2. annotation_task.status → APPROVED StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.APPROVED, TaskStatus.TRANSITIONS); task.setStatus("APPROVED"); task.setCompletedAt(LocalDateTime.now()); taskMapper.updateById(task); // 3. 写入任务历史 insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null); // ---- 事务边界 ---- // 步骤 4-7 通过 Spring ApplicationEvent 在事务提交后异步执行, // 避免 AI HTTP 调用阻塞事务(参见 ExtractionApprovedEventListener) applicationEventPublisher.publishEvent(new ExtractionApprovedEvent(taskId, task.getSourceId())); } // ExtractionApprovedEventListener(@TransactionalEventListener(phase = AFTER_COMMIT),@Async) // 4. 调用 AI 生成候选问答对 // String promptKey = "IMAGE".equals(getSourceType(task)) ? "prompt_qa_gen_image" : "prompt_qa_gen_text"; // QaGenResponse qaResponse = generateQa(task, result, promptTemplate); // // 5. 将候选问答对写入 training_dataset(PENDING_REVIEW) // // 6. 创建 QA_GENERATION 阶段任务(UNCLAIMED) // // 7. source_data.status → QA_REVIEW ``` **ExtractionService.reject() 核心逻辑:** ```java // 驳回——task 回退至 REJECTED,由标注员重新 claim 后进入 IN_PROGRESS @Transactional @OperationLog(type = "EXTRACTION_REJECT") public void reject(Long taskId, String reason) { AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); if (task.getClaimedBy().equals(getCurrentUserId())) throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务"); StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS); task.setStatus("REJECTED"); task.setRejectReason(reason); taskMapper.updateById(task); insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason); // ⚠️ source_data 状态保持 EXTRACTING(不回退),等标注员重新 claim 后继续修改 } ``` > **source_data EXTRACTING 状态说明**:任务从任务池创建(`ADMIN POST /api/tasks`)时,后端同步将 `source_data.status → EXTRACTING`。EXTRACTION 审批驳回后 source_data 保持 EXTRACTING 不变,标注员重新领取修改后重提,审批通过后再推进至 QA_REVIEW。 **接口清单:** | 方法 | 路径 | 最低权限 | 说明 | |------|------|----------|------| | GET | `/api/extraction/{taskId}` | ANNOTATOR | 获取当前提取结果(含 AI 预标注) | | PUT | `/api/extraction/{taskId}` | ANNOTATOR | 更新提取结果(整体 JSONB 覆盖) | | POST | `/api/extraction/{taskId}/submit` | ANNOTATOR | 提交提取结果,进入审批队列 | | POST | `/api/extraction/{taskId}/approve` | REVIEWER | 审批通过(自审防护:不能审批自己提交的任务),自动触发 QA 任务创建 | | POST | `/api/extraction/{taskId}/reject` | REVIEWER | 驳回(自审防护同上),附驳回原因;task → REJECTED,标注员需重新 claim | --- ### 4.5 问答生成模块(QA_GENERATION 阶段) `QaService` 的整体覆盖逻辑与 `ExtractionService` 一致(PUT 语义,禁止局部 PATCH)。 **QaService.reject() 核心逻辑:** ```java @Transactional @OperationLog(type = "QA_REJECT") public void reject(Long taskId, String reason) { AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); if (task.getClaimedBy().equals(getCurrentUserId())) throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务"); // training_dataset.status → REJECTED(标注员修改后重提,状态回 PENDING_REVIEW) trainingDatasetMapper.rejectByTaskId(taskId, reason, CompanyContext.get()); StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS); task.setStatus("REJECTED"); task.setRejectReason(reason); taskMapper.updateById(task); insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason); // source_data 保持 QA_REVIEW,等标注员重新领取修改后重提 } ``` **approve 级联动作(同一事务):** ```java @Transactional @OperationLog(type = "QA_APPROVE") public void approve(Long taskId) { AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED"); if (task.getClaimedBy().equals(getCurrentUserId())) throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务"); // 1. training_dataset.status → APPROVED trainingDatasetMapper.approveByTaskId(taskId, getCurrentUserId(), CompanyContext.get()); // 2. annotation_task.status → APPROVED task.setStatus("APPROVED"); task.setCompletedAt(LocalDateTime.now()); taskMapper.updateById(task); // 3. source_data.status → APPROVED(整条流水线完成) sourceDataMapper.updateStatus(task.getSourceId(), "APPROVED", CompanyContext.get()); // 4. 写入任务历史 insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null); } ``` **接口清单:** | 方法 | 路径 | 最低权限 | 说明 | |------|------|----------|------| | GET | `/api/qa/{taskId}` | ANNOTATOR | 获取候选问答对列表 | | PUT | `/api/qa/{taskId}` | ANNOTATOR | 修改问答对(整体覆盖) | | POST | `/api/qa/{taskId}/submit` | ANNOTATOR | 提交问答对,进入审批队列 | | POST | `/api/qa/{taskId}/approve` | REVIEWER | 审批通过(自审防护),training_dataset → APPROVED,source_data → APPROVED | | POST | `/api/qa/{taskId}/reject` | REVIEWER | 驳回(自审防护),training_dataset → REJECTED;标注员需重新 claim 修改后重提(重提后 training_dataset → PENDING_REVIEW) | --- ### 4.6 训练数据导出模块 **ExportService.createBatch() 核心逻辑:** ```java @Transactional @OperationLog(type = "EXPORT_CREATE") public ExportBatch createBatch(List sampleIds) { Long companyId = CompanyContext.get(); // 1. 校验样本均为 APPROVED 状态 List samples = trainingDatasetMapper .selectByIdsAndStatus(sampleIds, "APPROVED", companyId); if (samples.size() != sampleIds.size()) throw new BusinessException("INVALID_SAMPLES", "部分样本不处于 APPROVED 状态"); // 2. 生成 JSONL 内容(每行一个 glm_format_json) String batchUuid = UUID.randomUUID().toString(); String jsonl = samples.stream() .map(s -> s.getGlmFormatJson().toString()) .collect(Collectors.joining("\n")); // 3. 上传至 RustFS:finetune-export/export/{batchUuid}.jsonl String path = "export/" + batchUuid + ".jsonl"; byte[] bytes = jsonl.getBytes(StandardCharsets.UTF_8); rustFsClient.upload("finetune-export", path, new ByteArrayInputStream(bytes), bytes.length); // 4. 更新样本 export_batch_id 与 exported_at trainingDatasetMapper.batchUpdateExportInfo(sampleIds, batchUuid, LocalDateTime.now()); // 5. 写入 export_batch 记录 ExportBatch batch = ExportBatch.builder() .companyId(companyId).batchUuid(batchUuid).datasetFilePath(path) .sampleCount(samples.size()).finetuneStatus("NOT_STARTED") .createdBy(getCurrentUserId()).build(); exportBatchMapper.insert(batch); return batch; } ``` **FinetuneService.trigger():** 调用 `aiServiceClient.startFinetune(...)` 获取 `glmJobId`,更新 `export_batch.glm_job_id` 和 `finetune_status = RUNNING`。 **接口清单(全部需要 ADMIN 权限):** | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/training/samples` | 分页查询已审批通过、待导出的样本 | | POST | `/api/export/batch` | 创建导出批次,合并 JSONL 并上传 RustFS | | POST | `/api/export/{batchId}/finetune` | 向 GLM 工厂提交微调任务 | | GET | `/api/export/{batchId}/status` | 查询微调任务状态 | | GET | `/api/export/list` | 分页查询所有导出批次 | --- ### 4.7 系统配置模块 **SysConfigService.get(configKey):** 先按 `(companyId, configKey)` 查,未命中则按 `(NULL, configKey)` 查全局默认值,公司级配置优先覆盖全局值。 ```java public String get(String configKey) { Long companyId = CompanyContext.get(); // 先查公司级配置 SysConfig cfg = configMapper.selectByCompanyAndKey(companyId, configKey); if (cfg == null) { // 回退到全局默认(company_id IS NULL) cfg = configMapper.selectByCompanyAndKey(null, configKey); } return cfg != null ? cfg.getConfigValue() : null; } ``` **接口清单(全部需要 ADMIN 权限):** | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/config` | 获取所有配置项(公司级 + 全局默认) | | PUT | `/api/config/{key}` | 更新单项配置(含 Prompt 模板) | --- ### 4.8 视频处理模块 **接口清单(全部需要 ADMIN 权限):** | 方法 | 路径 | 说明 | |------|------|------| | POST | `/api/video/{sourceId}/process-frames` | 触发帧模式预处理(创建 FRAME_EXTRACT 任务,source_data → PREPROCESSING) | | POST | `/api/video/{sourceId}/process-to-text` | 触发片段模式预处理(创建 VIDEO_TO_TEXT 任务,source_data → PREPROCESSING) | | GET | `/api/video/{sourceId}/jobs` | 查询该视频的所有异步处理任务列表 | | PUT | `/api/video/jobs/{jobId}/reset` | 将 FAILED 状态的任务手动重置为 PENDING(仅 ADMIN 可操作) | | POST | `/api/video/jobs/{jobId}/callback` | AI 服务回调接口(内网调用,不经过 Shiro Token 验证,通过 IP 白名单保护) | **VideoProcessService 核心逻辑:** ```java @Transactional @OperationLog(type = "TASK_CREATE") public VideoProcessJob createJob(Long sourceId, String jobType, JsonNode params) { Long companyId = CompanyContext.get(); // 1. source_data.status → PREPROCESSING sourceDataMapper.updateStatus(sourceId, "PREPROCESSING", companyId); // 2. 创建 video_process_job(PENDING) VideoProcessJob job = VideoProcessJob.builder() .companyId(companyId).sourceId(sourceId) .jobType(jobType).params(params).status("PENDING").build(); videoProcessJobMapper.insert(job); // 3. 异步调用 AI 服务(非阻塞,由 AI 服务自行回调) triggerAiAsync(job); return job; } // AI 服务回调:幂等处理 @Transactional public void handleCallback(Long jobId, boolean success, String outputPath, String errorMsg) { VideoProcessJob job = videoProcessJobMapper.selectById(jobId); // 幂等:已是 SUCCESS 状态则静默忽略重复回调,不得重复创建 annotation_task if ("SUCCESS".equals(job.getStatus())) return; if (success) { job.setStatus("SUCCESS"); job.setOutputPath(outputPath); job.setCompletedAt(LocalDateTime.now()); // source_data.status → PENDING(进入后续标注流程) sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId()); // ⚠️ 两种模式的后续任务创建(ADMIN 看到 PENDING 后可手动创建标注任务) if ("FRAME_EXTRACT".equals(job.getJobType())) { // 帧模式:AI 返回帧列表,每帧对应一个 annotation_task(phase=EXTRACTION,video_unit_type=FRAME) // outputPath 指向帧列表 JSON 文件(存 RustFS),由 ADMIN 在 POST /api/tasks 时按帧创建任务 // 不在此处自动创建 EXTRACTION 任务——ADMIN 决定如何调度多帧任务 } else if ("VIDEO_TO_TEXT".equals(job.getJobType())) { // 片段模式:派生 TEXT 类型 source_data,parent_source_id 指向原视频 // 不在此处创建——ADMIN 收到 PENDING 通知后手动通过 POST /api/tasks 为派生 TEXT 资料创建任务 // 派生 source_data 已在 triggerAiAsync() 中预创建(status=PENDING) } } else { if (job.getRetryCount() >= job.getMaxRetries()) { // 达最大重试次数,置 FAILED,source_data → PENDING 保留 error_message,需 ADMIN 手动重置 job.setStatus("FAILED"); job.setErrorMessage(errorMsg); sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId()); } else { job.setStatus("RETRYING"); job.setRetryCount(job.getRetryCount() + 1); job.setErrorMessage(errorMsg); } } videoProcessJobMapper.updateById(job); } ``` --- ## 五、状态机实现规范 所有状态变更**必须**经过 `StateValidator.assertTransition()` 校验,禁止绕过直接调用 Mapper 更新状态字段。 ### 5.1 StateValidator ```java // com/label/common/statemachine/StateValidator.java public final class StateValidator { public static void assertTransition(S from, S to, Map> transitions) { Set allowed = transitions.getOrDefault(from, Set.of()); if (!allowed.contains(to)) throw new BusinessException("INVALID_STATE_TRANSITION", "非法状态转换: " + from + " → " + to); } } ``` ### 5.2 source_data 状态机 ```java public enum SourceStatus { PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED, REJECTED; public static final Map> TRANSITIONS = Map.of( PENDING, Set.of(EXTRACTING, PREPROCESSING), PREPROCESSING, Set.of(PENDING), EXTRACTING, Set.of(QA_REVIEW), QA_REVIEW, Set.of(APPROVED) // ⚠️ source_data 不会进入 REJECTED 状态: // QA 被驳回时 annotation_task → REJECTED,source_data 保持 QA_REVIEW。 // 整条流水线唯一终态为 APPROVED。 ); } ``` ### 5.3 annotation_task 状态机 ```java public enum TaskStatus { UNCLAIMED, IN_PROGRESS, SUBMITTED, APPROVED, REJECTED; public static final Map> TRANSITIONS = Map.of( UNCLAIMED, Set.of(IN_PROGRESS), IN_PROGRESS, Set.of(SUBMITTED, UNCLAIMED, IN_PROGRESS), // IN_PROGRESS → IN_PROGRESS 用于 ADMIN 强制转移(持有人变更,状态不变) SUBMITTED, Set.of(APPROVED, REJECTED), REJECTED, Set.of(IN_PROGRESS) // 驳回后重拾 ); } ``` ### 5.4 training_dataset 状态机 ```java public enum DatasetStatus { PENDING_REVIEW, APPROVED, REJECTED; public static final Map> TRANSITIONS = Map.of( PENDING_REVIEW, Set.of(APPROVED, REJECTED), REJECTED, Set.of(PENDING_REVIEW) // 驳回后可修改重提 ); } ``` ### 5.5 video_process_job 状态机 ```java public enum VideoJobStatus { PENDING, RUNNING, SUCCESS, FAILED, RETRYING; public static final Map> TRANSITIONS = Map.of( PENDING, Set.of(RUNNING), RUNNING, Set.of(SUCCESS, RETRYING, FAILED), RETRYING, Set.of(RUNNING, FAILED) // FAILED → PENDING 由 ADMIN 手动触发接口,不在此处声明自动流转 ); } ``` --- ## 六、Docker Compose 配置 ```yaml # docker-compose.yml version: "3.9" services: postgres: image: postgres:16-alpine environment: POSTGRES_DB: label_db POSTGRES_USER: label POSTGRES_PASSWORD: label_password volumes: - postgres_data:/var/lib/postgresql/data - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql ports: - "5432:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U label -d label_db"] interval: 10s timeout: 5s retries: 5 redis: image: redis:7-alpine command: redis-server --requirepass redis_password volumes: - redis_data:/data ports: - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "-a", "redis_password", "ping"] interval: 10s timeout: 5s retries: 5 rustfs: image: rustfs/rustfs:latest # 替换为生产可用的实际镜像 environment: RUSTFS_ACCESS_KEY: minioadmin RUSTFS_SECRET_KEY: minioadmin volumes: - rustfs_data:/data ports: - "9000:9000" # S3 API 端口 - "9001:9001" # Web 控制台端口 backend: build: context: . dockerfile: Dockerfile environment: SPRING_DATASOURCE_URL: jdbc:postgresql://postgres:5432/label_db SPRING_DATASOURCE_USERNAME: label SPRING_DATASOURCE_PASSWORD: label_password SPRING_REDIS_HOST: redis SPRING_REDIS_PORT: 6379 SPRING_REDIS_PASSWORD: redis_password RUSTFS_ENDPOINT: http://rustfs:9000 RUSTFS_ACCESS_KEY: minioadmin RUSTFS_SECRET_KEY: minioadmin AI_SERVICE_BASE_URL: http://ai-service:8000 ports: - "8080:8080" depends_on: postgres: condition: service_healthy redis: condition: service_healthy rustfs: condition: service_started healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"] interval: 15s retries: 5 ai-service: build: context: ../ai_service # Python FastAPI 服务所在目录 dockerfile: Dockerfile environment: RUSTFS_ENDPOINT: http://rustfs:9000 RUSTFS_ACCESS_KEY: minioadmin RUSTFS_SECRET_KEY: minioadmin ports: - "8000:8000" depends_on: - rustfs frontend: image: nginx:alpine volumes: - ../frontend/dist:/usr/share/nginx/html:ro - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro ports: - "80:80" depends_on: backend: condition: service_healthy volumes: postgres_data: redis_data: rustfs_data: ``` **Dockerfile(backend):** ```dockerfile FROM eclipse-temurin:17-jre-alpine WORKDIR /app COPY target/label-backend-*.jar app.jar EXPOSE 8080 ENTRYPOINT ["java", "-jar", "app.jar"] ``` --- ## 七、测试策略 ### 7.1 基本原则 - 集成测试**必须**使用真实的 PostgreSQL 和 Redis 实例(Testcontainers),禁止仅 Mock 数据库 - 每个受保护接口组**必须**有至少一个集成测试覆盖 Shiro 认证/鉴权过滤器链(缺少 Token → 401,角色不足 → 403) ### 7.2 并发任务领取测试(必须) ```java @Test void concurrentClaimShouldOnlySucceedOnce() throws InterruptedException { // 准备:创建一个 UNCLAIMED 任务 Long taskId = createUnclaimedTask(); int threadCount = 10; CountDownLatch latch = new CountDownLatch(1); AtomicInteger successCount = new AtomicInteger(0); ExecutorService pool = Executors.newFixedThreadPool(threadCount); for (int i = 0; i < threadCount; i++) { pool.submit(() -> { try { latch.await(); taskClaimService.claim(taskId); successCount.incrementAndGet(); } catch (BusinessException e) { // 预期:其余线程抛出"任务已被领取" } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } latch.countDown(); pool.awaitTermination(5, SECONDS); // 验证:仅 1 个线程领取成功 assertThat(successCount.get()).isEqualTo(1); AnnotationTask task = taskMapper.selectById(taskId); assertThat(task.getStatus()).isEqualTo("IN_PROGRESS"); assertThat(task.getClaimedBy()).isNotNull(); } ``` ### 7.3 视频回调幂等测试(必须) ```java @Test void duplicateSuccessCallbackShouldBeIdempotent() { Long sourceId = createVideoSource(); VideoProcessJob job = videoProcessService.createJob(sourceId, "FRAME_EXTRACT", params); // 第一次成功回调 videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null); // 重复成功回调 videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null); // 验证:只创建一个 annotation_task long taskCount = annotationTaskMapper.countBySourceId(sourceId); assertThat(taskCount).isEqualTo(1); } ``` ### 7.4 状态机越界拒绝测试 ```java @Test void illegalStateTransitionShouldThrow() { // 验证:APPROVED → IN_PROGRESS 被拒绝 assertThatThrownBy(() -> StateValidator.assertTransition(TaskStatus.APPROVED, TaskStatus.IN_PROGRESS, TaskStatus.TRANSITIONS) ).isInstanceOf(BusinessException.class) .hasMessageContaining("非法状态转换"); } ``` ### 7.5 多租户隔离测试 ```java @Test void companyACannotAccessCompanyBData() { Long sourceIdB = createSourceForCompany(companyB); // 以 companyA 身份请求 companyB 的资源 CompanyContext.set(companyA); assertThatThrownBy(() -> sourceService.findById(sourceIdB)) .isInstanceOf(BusinessException.class); } ``` --- ## 八、宪章合规检查清单 PR 合并前评审人**必须**逐条核对以下清单: | # | 宪章原则 | 实现位置 | 检查要点 | |---|----------|----------|----------| | 1 | 环境约束(JDK 17、SB 3、Shiro、MyBatis Plus) | `pom.xml` | 版本号符合约束;无 Spring Security 并行引入 | | 2 | 多租户数据隔离 | `TenantLineInnerInterceptor`、`CompanyContext` | 所有 Mapper 自动注入 company_id;ThreadLocal 在 finally 块清理 | | 3 | BCrypt 密码、UUID Token、滑动过期、禁 JWT | `AuthService`、`TokenFilter`、`RedisKeyManager` | 无明文密码存储;每次有效请求重置 TTL;无 JWT 库引入 | | 4 | 分级 RBAC、权限注解、角色变更驱逐缓存 | `UserRealm`、`@RequiresRoles`、`UserService` | 无 if-role 临时判断;`updateRole()` 立即删缓存 | | 5 | 双流水线、级联触发 QA 任务、parent_source_id 溯源 | `ExtractionService.approve()` | 级联动作在同一 @Transactional 内;视频转文本的 parent_source_id 不为空 | | 6 | 状态机完整性 | `StateValidator`、各 `*Status` 枚举 | 所有状态变更调用 `StateValidator`;无绕过直接写 Mapper 的路径 | | 7 | 任务争抢双重保障 | `TaskClaimService.claim()` | Redis SET NX + DB WHERE status='UNCLAIMED' 两道并存 | | 8 | 异步任务幂等、重试上限、FAILED 需手动重置 | `VideoProcessService.handleCallback()` | 重复成功回调静默忽略;`retry_count >= max_retries` 置 FAILED | | 9 | 只追加审计日志、AOP 切面、审计失败不回滚业务 | `AuditAspect`、`@OperationLog` | `sys_operation_log` 无 UPDATE/DELETE;审计异常仅 error 日志 | | 10 | RESTful URL、统一响应格式、分页必须 | `Result`、各 Controller | 无动词路径;无裸 POJO 返回;所有列表接口有分页参数 | | 11 | YAGNI:业务在 Service,Controller 只处理 HTTP | 包结构 | Controller 无业务判断逻辑;无未使用的抽象层 | --- ## 九、设计评审报告 **评审日期**:2026-04-09 | **评审方式**:独立子 Agent 五维评审 + 人工复核 | **初始质量评分**:6.5/10 → 修复后:**8.5/10** ### 9.1 发现并已修复的问题 | # | 严重级 | 位置 | 问题描述 | 修复内容 | |---|--------|------|----------|----------| | A | CRITICAL | §4.3 任务池接口 | `GET /api/tasks/pool` 的角色过滤逻辑不明确,开发者无法判断各角色看到哪些任务 | 在接口说明中补充:ANNOTATOR 看 EXTRACTION-UNCLAIMED;REVIEWER 看两类 UNCLAIMED;ADMIN 走全量接口 | | B | HIGH | §4.4、§4.5 审批接口 | approve/reject 接口未声明"提交人不能自审"约束,存在质量绕过风险 | 在 `approve()` 和 `reject()` 代码示例中加入自审防护检查:`if (task.getClaimedBy().equals(getCurrentUserId())) throw` | | C | HIGH | §4.8 视频处理 | 视频处理模块完全缺失 REST API 接口清单,帧模式/片段模式成功后的后续任务创建分支不清晰 | 补充 5 条接口(含 ADMIN-only 权限),在 handleCallback() 中明确两种模式的处理路径差异 | | D | MEDIUM | §4.4 | 缺少 `ExtractionService.reject()` 代码示例,驳回后 source_data 状态不明确 | 补充 reject() 完整实现;说明 source_data 保持 EXTRACTING 等待标注员重提 | | E | MEDIUM | §4.5 | 缺少 `QaService.reject()` 代码示例,training_dataset 驳回后状态流转不清晰 | 补充 reject():training_dataset → REJECTED;重提后 → PENDING_REVIEW | | F | LOW | §4.3 | reassign 接口说明过于简洁,未说明 annotation_task_history 写入规则 | 补充说明:保持 IN_PROGRESS,history 写入 from=IN_PROGRESS/to=IN_PROGRESS + note | ### 9.2 未修复的设计选择(需业务澄清) | # | 问题 | 背景 | 建议 | |---|------|------|------| | G | 原需求提到"若任务池中无空闲审批员,由标注员兼任审批" | 宪章要求 REVIEWER ⊃ ANNOTATOR 角色继承,系统无法动态提升 ANNOTATOR 权限 | **建议取消此机制**:统一要求 QA 审批必须由 REVIEWER 或 ADMIN 执行;若无可用 REVIEWER,由 ADMIN 代为审批。若需保留,需修改宪章并引入动态权限提升机制(超出当前架构范围) | | H | `GET /api/tasks/{id}` 的精细访问控制 | ANNOTATOR 是否只能查看自己的任务?REJECTED 任务是否对原领取人可见? | 当前设计依赖 `company_id` 隔离,建议 Service 层加 `claimed_by = currentUserId OR role >= REVIEWER` 的访问控制检查 | ### 9.3 修复后宪章合规状态 | 宪章原则 | 修复前 | 修复后 | |----------|--------|--------| | 四、分级 RBAC(权限注解声明) | ✅ 基本覆盖 | ✅ 补充了自审防护,任务池过滤规则完整 | | 五、双标注流水线(级联触发规则) | ⚠️ reject 路径缺失 | ✅ reject() 示例补全,source_data 回退路径明确 | | 六、状态机完整性 | ⚠️ QA reject 状态转换不明 | ✅ training_dataset REJECTED → PENDING_REVIEW 完整 | | 八、异步任务处理(幂等回调) | ⚠️ 两种视频模式处理分支缺失 | ✅ handleCallback 分支注释明确 | --- ### 9.4 审批流程合理性专项评审(第二轮) **评审日期**:2026-04-09 | **评审焦点**:审批工作流是否可被实际执行 #### 9.4.1 发现的问题 | # | 严重级 | 位置 | 问题描述 | 根因 | 建议修复 | |---|--------|------|----------|------|----------| | I | CRITICAL | §4.3、§4.4、§4.5 | **REVIEWER 无审批收件箱**:`GET /api/tasks/pool` 仅返回 UNCLAIMED 任务,REVIEWER 无法发现哪些任务处于 SUBMITTED 状态等待审批。整个审批流程在 API 层面缺失入口。 | 任务池设计仅服务于"领取"场景,未考虑"审批"场景 | 新增接口:`GET /api/tasks/pending-review`(权限 REVIEWER),返回 `phase=EXTRACTION OR QA_GENERATION AND status=SUBMITTED` 的任务列表(分页);ADMIN 可查全部 | | J | HIGH | §4.4 `ExtractionService.approve()` | **@Transactional 内同步调用 AI 服务**:`generateQa()` 在事务中发起 HTTP 请求,可能耗时 5–30 秒。将导致:① DB 连接被长期占用(连接池耗尽风险);② AI 调用失败会回滚已完成的审批动作(标注员和审批员的工作丢失);③ 审批接口响应时间不可预期。 | 审批与 QA 生成未解耦 | **两阶段拆分**:`approve()` 事务只完成步骤 1–3(is_final、APPROVED、history),然后发布 `ExtractionApprovedEvent`;`QaGenerationListener` 异步消费该事件,完成步骤 4–7(AI 调用、生成 QA、创建 QA 任务、更新 source_data)。若 AI 调用失败,审批结果不回滚,QA 任务可重试 | | K | HIGH | §4.3、§4.4、§4.5 | **REJECTED 任务重拾路径未实现**:驳回后 task.status = REJECTED,标注员需"重新 claim",但:① `GET /api/tasks/pool` 仅展示 UNCLAIMED 任务,REJECTED 任务不可见;② 没有 `POST /api/tasks/{id}/reclaim` 接口;③ 状态机定义了 `REJECTED → IN_PROGRESS` 合法转换,但无 API 触发它。被驳回的标注员无任何操作路径。 | 驳回后续路径仅在状态机中声明,未转化为接口 | 方案一:修改 `GET /api/tasks/mine` 包含 REJECTED 状态,新增 `POST /api/tasks/{id}/reclaim` 接口(权限 ANNOTATOR,状态机校验 REJECTED → IN_PROGRESS,设置 status=IN_PROGRESS)。方案二:驳回时直接将 task 置为 UNCLAIMED,退回公共任务池(原领取人或他人均可重新领取)。**推荐方案一**(保留原领取人优先权,避免任务被他人抢占) | | L | MEDIUM | §4.5 `QaService.approve()` | **重复变量声明(编译错误)**:方法内 `task` 变量被声明两次(第 1057 行与第 1065 行),Java 不允许同作用域内重复声明,此代码无法编译通过。 | 复制粘贴失误 | 删除第 1065 行的 `AnnotationTask task =`,直接使用第 1057 行已声明的 `task` 变量 | | M | MEDIUM | §5.2 `SourceStatus` 状态机 | **source_data.REJECTED 状态不可达**:状态机声明 `QA_REVIEW → REJECTED` 合法,但 `QaService.reject()` 明确注释"source_data 保持 QA_REVIEW",从不触发此转换。REJECTED 是死状态,状态机与实现不一致,可能误导实现者。 | 状态机定义未与实现对齐 | 从 `SourceStatus.TRANSITIONS` 中移除 `QA_REVIEW → REJECTED` 转换,并在注释中说明:source_data 不会进入 REJECTED;QA 被驳回时 source_data 保持 QA_REVIEW,仅 annotation_task 进入 REJECTED | #### 9.4.2 审批工作流完整路径(修复后期望状态) ``` EXTRACTION 阶段: ANNOTATOR 领取(/api/tasks/{id}/claim) → 标注工作台提交(/api/extraction/{taskId}/submit) → REVIEWER 从审批收件箱发现(/api/tasks/pending-review) → 审批通过(/api/extraction/{taskId}/approve) → [异步] AI 生成 QA → 创建 QA_GENERATION 任务 → source_data → QA_REVIEW → 审批驳回(/api/extraction/{taskId}/reject) → 标注员从"我的任务"看到 REJECTED 任务 → 重拾(/api/tasks/{id}/reclaim)→ IN_PROGRESS → 修改后重提 QA_GENERATION 阶段: ANNOTATOR/REVIEWER 领取(/api/tasks/{id}/claim) → 问答对修改提交(/api/qa/{taskId}/submit) → REVIEWER 从审批收件箱发现(/api/tasks/pending-review) → 审批通过(/api/qa/{taskId}/approve) → training_dataset → APPROVED → source_data → APPROVED(流水线终态) → 审批驳回(/api/qa/{taskId}/reject) → training_dataset → REJECTED;source_data 保持 QA_REVIEW → 标注员重拾(/api/tasks/{id}/reclaim)→ 修改后重提 ``` #### 9.4.3 需立即修复的条目 | 问题 | 修复位置 | 修复类型 | |------|----------|----------| | I(无审批收件箱) | §4.3 接口清单 | 新增接口 `GET /api/tasks/pending-review` | | J(@Transactional 内 AI 调用) | §4.4 ExtractionService.approve() | 拆分为两阶段,步骤 4-7 异步化 | | K(REJECTED 重拾路径缺失) | §4.3 接口清单 + §4.4/4.5 说明 | 新增接口 `POST /api/tasks/{id}/reclaim`;`GET /api/tasks/mine` 包含 REJECTED | | L(重复变量声明) | §4.5 QaService.approve() 代码 | 删除第二个 `AnnotationTask task =` 声明 | | M(source_data 死状态) | §5.2 SourceStatus 状态机 | 移除 `QA_REVIEW → REJECTED` 转换 |