Phase 0:research.md(10项技术决策,无需澄清项) Phase 1:data-model.md(11张表+Redis结构),contracts/(8个模块API契约),quickstart.md(Docker Compose启动+流水线验证) plan.md:宪章11条全部通过,项目结构确认
1604 lines
64 KiB
Markdown
1604 lines
64 KiB
Markdown
# label_backend 开发实施指南
|
||
|
||
**版本:1.0.0 | 日期:2026-04-09 | 依据:backend后台设计.md v2.2 + constitution.md v1.1.0**
|
||
|
||
---
|
||
|
||
## 目录
|
||
|
||
- [一、项目总览](#一项目总览)
|
||
- [1.1 系统定位](#11-系统定位)
|
||
- [1.2 数据流水线](#12-数据流水线)
|
||
- [1.3 技术栈矩阵](#13-技术栈矩阵)
|
||
- [1.4 后端模块清单](#14-后端模块清单)
|
||
- [1.5 包结构](#15-包结构)
|
||
- [二、数据库 DDL](#二数据库-ddl)
|
||
- [三、公共基础设施](#三公共基础设施)
|
||
- [3.1 统一响应封装](#31-统一响应封装)
|
||
- [3.2 全局异常处理](#32-全局异常处理)
|
||
- [3.3 多租户 CompanyContext(ThreadLocal)](#33-多租户-companycontextthreadlocal)
|
||
- [3.4 Shiro 配置](#34-shiro-配置)
|
||
- [3.5 Redis Key 管理](#35-redis-key-管理)
|
||
- [3.6 AOP 审计日志切面](#36-aop-审计日志切面)
|
||
- [3.7 RustFS S3 客户端](#37-rustfs-s3-客户端)
|
||
- [3.8 AI 服务 HTTP 客户端](#38-ai-服务-http-客户端)
|
||
- [四、业务模块纵切](#四业务模块纵切)
|
||
- [4.1 用户与权限模块](#41-用户与权限模块)
|
||
- [4.2 资料管理模块](#42-资料管理模块)
|
||
- [4.3 任务管理模块(含并发控制)](#43-任务管理模块含并发控制)
|
||
- [4.4 标注工作台模块(EXTRACTION 阶段)](#44-标注工作台模块extraction-阶段)
|
||
- [4.5 问答生成模块(QA_GENERATION 阶段)](#45-问答生成模块qa_generation-阶段)
|
||
- [4.6 训练数据导出模块](#46-训练数据导出模块)
|
||
- [4.7 系统配置模块](#47-系统配置模块)
|
||
- [4.8 视频处理模块](#48-视频处理模块)
|
||
- [五、状态机实现规范](#五状态机实现规范)
|
||
- [5.1 StateValidator](#51-statevalidator)
|
||
- [5.2 source\_data 状态机](#52-source_data-状态机)
|
||
- [5.3 annotation\_task 状态机](#53-annotation_task-状态机)
|
||
- [5.4 training\_dataset 状态机](#54-training_dataset-状态机)
|
||
- [5.5 video\_process\_job 状态机](#55-video_process_job-状态机)
|
||
- [六、Docker Compose 配置](#六docker-compose-配置)
|
||
- [七、测试策略](#七测试策略)
|
||
- [7.1 基本原则](#71-基本原则)
|
||
- [7.2 并发任务领取测试(必须)](#72-并发任务领取测试必须)
|
||
- [7.3 视频回调幂等测试(必须)](#73-视频回调幂等测试必须)
|
||
- [7.4 状态机越界拒绝测试](#74-状态机越界拒绝测试)
|
||
- [7.5 多租户隔离测试](#75-多租户隔离测试)
|
||
- [八、宪章合规检查清单](#八宪章合规检查清单)
|
||
|
||
---
|
||
|
||
## 一、项目总览
|
||
|
||
### 1.1 系统定位
|
||
|
||
label_backend 是知识图谱智能标注平台的后端服务,基于 Spring Boot 3 构建,驱动**文本线**和**图片线**两条标注流水线,将文档、图片、视频原始资料处理为 GLM 微调格式的训练数据集。
|
||
|
||
### 1.2 数据流水线
|
||
|
||
```
|
||
文本线: 文档 → 三元组提取(主语/谓语/宾语 + 原文片段 + 字符偏移)
|
||
→ 问答对生成 → 审批 → 训练样本(GLM 文本格式)
|
||
|
||
图片线: 图片 → 四元组提取(主体/关系/客体/修饰词 + bbox + 裁剪图)
|
||
→ 问答对生成 → 审批 → 训练样本(GLM 图文格式)
|
||
|
||
视频预处理(非独立流水线):
|
||
帧模式: 视频 → AI 抽帧 → 每帧作为图片进入图片线
|
||
片段模式: 视频 → 多模态模型转文本 → 派生 TEXT source_data → 进入文本线
|
||
```
|
||
|
||
### 1.3 技术栈矩阵
|
||
|
||
| 层次 | 技术 | 版本约束 |
|
||
|------|------|----------|
|
||
| 运行时 | JDK | 17(LTS) |
|
||
| 框架 | Spring Boot | ≥ 3.0.x |
|
||
| 认证/鉴权 | Apache Shiro | ≥ 1.13.x(兼容 Spring Boot 3) |
|
||
| ORM | MyBatis Plus | ≥ 3.5.x |
|
||
| 数据库 | PostgreSQL | ≥ 14 |
|
||
| 缓存/锁 | Redis | ≥ 6.x |
|
||
| 对象存储 | RustFS(S3 兼容接口) | 当前稳定版 |
|
||
| AI 服务 | Python FastAPI(HTTP 调用) | JVM 侧仅作 RestClient 调用 |
|
||
| 容器化 | Docker Compose | ≥ 2.x |
|
||
| 构建工具 | Maven | ≥ 3.8 |
|
||
|
||
### 1.4 后端模块清单
|
||
|
||
| 模块 | 职责 |
|
||
|------|------|
|
||
| 用户与权限模块 | 用户管理、Shiro RBAC、Token 认证 |
|
||
| 资料管理模块 | 文件上传至 RustFS、source_data 元数据管理 |
|
||
| 任务池模块 | 任务创建、领取(分布式锁 + 乐观锁)、状态流转 |
|
||
| 标注工作台模块 | 调用 AI 提取三/四元组、annotation_result JSONB 写入 |
|
||
| 问答生成模块 | 调用 AI 生成候选问答对、training_dataset 管理 |
|
||
| 训练数据导出模块 | JSONL 批次导出、GLM 微调对接 |
|
||
| 系统配置模块 | Prompt 模板、模型参数、全局/公司级配置管理 |
|
||
| 视频处理模块 | 异步抽帧 / 转文本任务跟踪、幂等回调处理 |
|
||
|
||
### 1.5 包结构
|
||
|
||
```
|
||
com.label
|
||
├── LabelBackendApplication.java
|
||
├── common/
|
||
│ ├── result/ # Result<T>、ResultCode、PageResult<T>
|
||
│ ├── exception/ # BusinessException、GlobalExceptionHandler
|
||
│ ├── context/ # CompanyContext(ThreadLocal)
|
||
│ ├── shiro/ # TokenFilter、UserRealm、ShiroConfig
|
||
│ ├── redis/ # RedisKeyManager、RedisService
|
||
│ ├── aop/ # AuditAspect、@OperationLog 注解
|
||
│ ├── storage/ # RustFsClient(S3 兼容封装)
|
||
│ ├── ai/ # AiServiceClient(RestClient 封装 8 个端点)
|
||
│ └── statemachine/ # StateValidator、各状态枚举
|
||
├── module/
|
||
│ ├── user/
|
||
│ │ ├── controller/ # AuthController、UserController
|
||
│ │ ├── service/ # AuthService、UserService
|
||
│ │ ├── mapper/ # SysUserMapper、SysCompanyMapper
|
||
│ │ ├── entity/ # SysUser、SysCompany
|
||
│ │ └── dto/ # LoginRequest、UserDTO、UserCreateRequest
|
||
│ ├── source/
|
||
│ │ ├── controller/ # SourceController
|
||
│ │ ├── service/ # SourceService
|
||
│ │ ├── mapper/ # SourceDataMapper
|
||
│ │ ├── entity/ # SourceData
|
||
│ │ └── dto/ # SourceUploadResponse、SourceListQuery
|
||
│ ├── task/
|
||
│ │ ├── controller/ # TaskController
|
||
│ │ ├── service/ # TaskService、TaskClaimService
|
||
│ │ ├── mapper/ # AnnotationTaskMapper、TaskHistoryMapper
|
||
│ │ ├── entity/ # AnnotationTask、AnnotationTaskHistory
|
||
│ │ └── dto/ # TaskCreateRequest、TaskPoolQuery、TaskDetailDTO
|
||
│ ├── annotation/
|
||
│ │ ├── controller/ # ExtractionController、QaController
|
||
│ │ ├── service/ # ExtractionService、QaService
|
||
│ │ ├── mapper/ # AnnotationResultMapper、TrainingDatasetMapper
|
||
│ │ ├── entity/ # AnnotationResult、TrainingDataset
|
||
│ │ └── dto/ # ExtractionResultDTO、QaItemDTO、RejectRequest
|
||
│ ├── export/
|
||
│ │ ├── controller/ # ExportController
|
||
│ │ ├── service/ # ExportService、FinetuneService
|
||
│ │ ├── mapper/ # ExportBatchMapper
|
||
│ │ ├── entity/ # ExportBatch
|
||
│ │ └── dto/ # ExportBatchRequest、FinetuneRequest
|
||
│ ├── config/
|
||
│ │ ├── controller/ # SysConfigController
|
||
│ │ ├── service/ # SysConfigService
|
||
│ │ ├── mapper/ # SysConfigMapper
|
||
│ │ ├── entity/ # SysConfig
|
||
│ │ └── dto/ # ConfigUpdateRequest
|
||
│ └── video/
|
||
│ ├── controller/ # VideoController
|
||
│ ├── service/ # VideoProcessService
|
||
│ ├── mapper/ # VideoProcessJobMapper
|
||
│ ├── entity/ # VideoProcessJob
|
||
│ └── dto/ # VideoProcessRequest、VideoCallbackRequest
|
||
```
|
||
|
||
[↑ 返回目录](#目录)
|
||
|
||
---
|
||
|
||
## 二、数据库 DDL
|
||
|
||
> 执行顺序:sys_company → sys_user → source_data → annotation_task → annotation_result → training_dataset → export_batch → sys_config → sys_operation_log → annotation_task_history → video_process_job
|
||
|
||
```sql
|
||
-- =============================================
|
||
-- 1. sys_company — 公司表(多租户根节点)
|
||
-- =============================================
|
||
CREATE TABLE sys_company (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_name VARCHAR(100) NOT NULL UNIQUE,
|
||
company_code VARCHAR(50) NOT NULL UNIQUE,
|
||
status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE', -- ACTIVE / DISABLED
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
|
||
-- =============================================
|
||
-- 2. sys_user — 用户表
|
||
-- =============================================
|
||
CREATE TABLE sys_user (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
username VARCHAR(50) NOT NULL,
|
||
password_hash VARCHAR(255) NOT NULL, -- BCrypt,强度因子 >= 10
|
||
real_name VARCHAR(50),
|
||
role VARCHAR(20) NOT NULL, -- UPLOADER / ANNOTATOR / REVIEWER / ADMIN
|
||
status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE',
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
CONSTRAINT uq_company_username UNIQUE (company_id, username)
|
||
);
|
||
CREATE INDEX idx_sys_user_company ON sys_user(company_id);
|
||
|
||
-- =============================================
|
||
-- 3. source_data — 原始资料元数据表
|
||
-- =============================================
|
||
CREATE TABLE source_data (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
uploader_id BIGINT REFERENCES sys_user(id),
|
||
data_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO
|
||
file_path VARCHAR(500) NOT NULL,
|
||
file_name VARCHAR(255) NOT NULL,
|
||
file_size BIGINT,
|
||
bucket_name VARCHAR(100) NOT NULL,
|
||
parent_source_id BIGINT REFERENCES source_data(id), -- 视频转文本时指向原视频
|
||
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
|
||
-- PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED(无 REJECTED 状态,QA 驳回作用于 annotation_task)
|
||
reject_reason TEXT,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_source_data_company ON source_data(company_id);
|
||
CREATE INDEX idx_source_data_status ON source_data(company_id, status);
|
||
CREATE INDEX idx_source_data_parent ON source_data(parent_source_id);
|
||
|
||
-- =============================================
|
||
-- 4. annotation_task — 标注任务表
|
||
-- =============================================
|
||
CREATE TABLE annotation_task (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
source_id BIGINT NOT NULL REFERENCES source_data(id),
|
||
phase VARCHAR(20) NOT NULL, -- EXTRACTION / QA_GENERATION
|
||
task_type VARCHAR(20) NOT NULL, -- AI_ASSISTED / MANUAL
|
||
ai_model VARCHAR(50),
|
||
video_unit_type VARCHAR(20), -- FRAME(仅视频帧模式填写,其余为空)
|
||
video_unit_info JSONB, -- {"frame_index":150,"time_sec":5.0,"frame_path":"..."}
|
||
claimed_by BIGINT REFERENCES sys_user(id),
|
||
claimed_at TIMESTAMP,
|
||
status VARCHAR(20) NOT NULL DEFAULT 'UNCLAIMED',
|
||
-- UNCLAIMED / IN_PROGRESS / SUBMITTED / APPROVED / REJECTED
|
||
reject_reason TEXT,
|
||
submitted_at TIMESTAMP,
|
||
completed_at TIMESTAMP,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_annotation_task_company ON annotation_task(company_id);
|
||
CREATE INDEX idx_annotation_task_pool ON annotation_task(company_id, phase, status);
|
||
CREATE INDEX idx_annotation_task_claimed ON annotation_task(claimed_by, status);
|
||
|
||
-- =============================================
|
||
-- 5. annotation_result — 标注结果表(EXTRACTION 阶段,JSONB 聚合)
|
||
-- =============================================
|
||
CREATE TABLE annotation_result (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
|
||
result_json JSONB NOT NULL, -- 整体覆盖,禁止局部 PATCH
|
||
is_final BOOLEAN NOT NULL DEFAULT FALSE,
|
||
submitted_by BIGINT REFERENCES sys_user(id),
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_annotation_result_task ON annotation_result(task_id);
|
||
CREATE INDEX idx_annotation_result_final ON annotation_result(company_id, is_final);
|
||
|
||
-- =============================================
|
||
-- 6. training_dataset — 训练样本表(问答对终态)
|
||
-- =============================================
|
||
CREATE TABLE training_dataset (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
|
||
source_id BIGINT NOT NULL REFERENCES source_data(id),
|
||
extraction_result_id BIGINT NOT NULL REFERENCES annotation_result(id),
|
||
sample_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO_FRAME
|
||
glm_format_json JSONB NOT NULL, -- GLM 微调格式,JSONB 支持字段级查询
|
||
export_batch_id VARCHAR(50), -- 未导出时为空
|
||
status VARCHAR(20) NOT NULL DEFAULT 'PENDING_REVIEW',
|
||
-- PENDING_REVIEW / APPROVED / REJECTED
|
||
reject_reason TEXT,
|
||
reviewed_by BIGINT REFERENCES sys_user(id),
|
||
exported_at TIMESTAMP,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_training_dataset_company ON training_dataset(company_id);
|
||
CREATE INDEX idx_training_dataset_status ON training_dataset(company_id, status);
|
||
CREATE INDEX idx_training_dataset_batch ON training_dataset(export_batch_id);
|
||
|
||
-- =============================================
|
||
-- 7. export_batch — 导出批次表
|
||
-- =============================================
|
||
CREATE TABLE export_batch (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
batch_uuid VARCHAR(50) NOT NULL UNIQUE,
|
||
dataset_file_path VARCHAR(500),
|
||
sample_count INT NOT NULL DEFAULT 0,
|
||
glm_job_id VARCHAR(100), -- 调用微调接口后填写,初始为 NULL
|
||
finetune_status VARCHAR(20) NOT NULL DEFAULT 'NOT_STARTED',
|
||
-- NOT_STARTED / RUNNING / SUCCESS / FAILED
|
||
error_message TEXT,
|
||
created_by BIGINT REFERENCES sys_user(id),
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_export_batch_company ON export_batch(company_id);
|
||
|
||
-- =============================================
|
||
-- 8. sys_config — 系统配置表(支持全局默认 + 公司级覆盖)
|
||
-- =============================================
|
||
CREATE TABLE sys_config (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT REFERENCES sys_company(id), -- NULL = 全局默认配置
|
||
config_key VARCHAR(100) NOT NULL,
|
||
config_value TEXT NOT NULL,
|
||
description TEXT,
|
||
updated_by BIGINT REFERENCES sys_user(id),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
CONSTRAINT uq_config_company_key UNIQUE (company_id, config_key)
|
||
);
|
||
|
||
-- 预置全局配置项
|
||
INSERT INTO sys_config (company_id, config_key, config_value, description) VALUES
|
||
(NULL, 'prompt_extract_text', '...', '文本三元组提取 Prompt 模板'),
|
||
(NULL, 'prompt_extract_image', '...', '图像四元组提取 Prompt 模板'),
|
||
(NULL, 'prompt_video_to_text', '...', '视频转文本 Prompt 模板'),
|
||
(NULL, 'prompt_qa_gen_text', '...', '文本问答对生成 Prompt 模板'),
|
||
(NULL, 'prompt_qa_gen_image', '...', '图像问答对生成 Prompt 模板'),
|
||
(NULL, 'model_default', 'glm-4', '默认 AI 辅助模型'),
|
||
(NULL, 'video_frame_interval', '30', '视频帧模式抽帧间隔(帧数)'),
|
||
(NULL, 'token_ttl_seconds', '7200', 'Token 有效期(秒)'),
|
||
(NULL, 'glm_api_base_url', 'http://ai-service:8000', 'GLM API 地址');
|
||
|
||
-- =============================================
|
||
-- 9. sys_operation_log — 操作审计日志(只追加,按月分区)
|
||
-- =============================================
|
||
CREATE TABLE sys_operation_log (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT REFERENCES sys_company(id),
|
||
operator_id BIGINT REFERENCES sys_user(id), -- 登录失败时可为 NULL
|
||
operator_name VARCHAR(50) NOT NULL, -- 操作时用户名快照
|
||
operation_type VARCHAR(50) NOT NULL,
|
||
target_type VARCHAR(30),
|
||
target_id BIGINT,
|
||
detail JSONB,
|
||
ip_address VARCHAR(50),
|
||
result VARCHAR(10) NOT NULL, -- SUCCESS / FAIL
|
||
error_message TEXT,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
) PARTITION BY RANGE (created_at);
|
||
|
||
-- 初始分区(建议使用 pg_partman 自动维护)
|
||
CREATE TABLE sys_operation_log_2026_04
|
||
PARTITION OF sys_operation_log
|
||
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
|
||
|
||
CREATE TABLE sys_operation_log_2026_05
|
||
PARTITION OF sys_operation_log
|
||
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
|
||
|
||
-- =============================================
|
||
-- 10. annotation_task_history — 任务流转历史(只追加)
|
||
-- =============================================
|
||
CREATE TABLE annotation_task_history (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
|
||
from_status VARCHAR(20), -- 任务初建时为 NULL
|
||
to_status VARCHAR(20) NOT NULL,
|
||
operator_id BIGINT NOT NULL REFERENCES sys_user(id),
|
||
operator_role VARCHAR(20) NOT NULL, -- 操作时角色快照
|
||
note TEXT, -- 驳回原因、强制转移说明等
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_task_history_task ON annotation_task_history(task_id);
|
||
|
||
-- =============================================
|
||
-- 11. video_process_job — 视频异步处理任务表
|
||
-- =============================================
|
||
CREATE TABLE video_process_job (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
source_id BIGINT NOT NULL REFERENCES source_data(id),
|
||
job_type VARCHAR(20) NOT NULL, -- FRAME_EXTRACT / VIDEO_TO_TEXT
|
||
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
|
||
-- PENDING / RUNNING / SUCCESS / FAILED / RETRYING
|
||
params JSONB NOT NULL,
|
||
total_units INT,
|
||
processed_units INT NOT NULL DEFAULT 0,
|
||
output_path VARCHAR(500),
|
||
retry_count INT NOT NULL DEFAULT 0,
|
||
max_retries INT NOT NULL DEFAULT 3,
|
||
error_message TEXT,
|
||
started_at TIMESTAMP,
|
||
completed_at TIMESTAMP,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_video_process_job_source ON video_process_job(source_id);
|
||
CREATE INDEX idx_video_process_job_status ON video_process_job(status);
|
||
```
|
||
|
||
[↑ 返回目录](#目录)
|
||
|
||
---
|
||
|
||
## 三、公共基础设施
|
||
|
||
### 3.1 统一响应封装
|
||
|
||
```java
|
||
// com/label/common/result/Result.java
|
||
public record Result<T>(String code, T data, String message) {
|
||
public static <T> Result<T> ok(T data) {
|
||
return new Result<>("SUCCESS", data, null);
|
||
}
|
||
public static Result<Void> ok() {
|
||
return new Result<>("SUCCESS", null, null);
|
||
}
|
||
public static <T> Result<T> fail(String code, String message) {
|
||
return new Result<>(code, null, message);
|
||
}
|
||
}
|
||
|
||
// com/label/common/result/PageResult.java
|
||
public record PageResult<T>(List<T> items, long total, int page, int pageSize) {}
|
||
```
|
||
|
||
所有 Controller 返回 `Result<T>`,禁止直接返回裸 POJO 或裸 List。分页接口返回 `Result<PageResult<T>>`。
|
||
|
||
### 3.2 全局异常处理
|
||
|
||
```java
|
||
// com/label/common/exception/GlobalExceptionHandler.java
|
||
@RestControllerAdvice
|
||
public class GlobalExceptionHandler {
|
||
|
||
@ExceptionHandler(BusinessException.class)
|
||
public ResponseEntity<Result<?>> handleBusiness(BusinessException ex) {
|
||
return ResponseEntity.status(ex.getHttpStatus())
|
||
.body(Result.fail(ex.getCode(), ex.getMessage()));
|
||
}
|
||
|
||
@ExceptionHandler(UnauthorizedException.class)
|
||
public ResponseEntity<Result<?>> handleUnauthorized() {
|
||
return ResponseEntity.status(403)
|
||
.body(Result.fail("FORBIDDEN", "权限不足"));
|
||
}
|
||
|
||
@ExceptionHandler(Exception.class)
|
||
public ResponseEntity<Result<?>> handleUnexpected(Exception ex, HttpServletRequest req) {
|
||
log.error("Unexpected error on {}", req.getRequestURI(), ex);
|
||
// 生产响应禁止包含堆栈跟踪
|
||
return ResponseEntity.status(500)
|
||
.body(Result.fail("INTERNAL_ERROR", "服务器内部错误"));
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.3 多租户 CompanyContext(ThreadLocal)
|
||
|
||
```java
|
||
// com/label/common/context/CompanyContext.java
|
||
public final class CompanyContext {
|
||
private static final ThreadLocal<Long> COMPANY_ID = new ThreadLocal<>();
|
||
|
||
public static void set(Long companyId) { COMPANY_ID.set(companyId); }
|
||
public static Long get() { return COMPANY_ID.get(); }
|
||
public static void clear() { COMPANY_ID.remove(); } // 必须在 finally 块调用
|
||
}
|
||
```
|
||
|
||
在 Shiro `TokenFilter` 的 `executeLogin` 成功后注入;在过滤器的 `finally` 块调用 `CompanyContext.clear()`,防止线程池复用时数据串漏至其他租户。
|
||
|
||
**禁止**调用方通过请求体传入 `company_id` 作为参数。所有 Service 通过 `CompanyContext.get()` 获取。
|
||
|
||
MyBatis Plus 配置 `TenantLineInnerInterceptor` 自动向所有 Mapper 查询注入 `company_id = ?` 条件:
|
||
|
||
```java
|
||
@Bean
|
||
public MybatisPlusInterceptor mybatisPlusInterceptor() {
|
||
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
|
||
interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(
|
||
new TenantLineHandler() {
|
||
public Expression getTenantId() {
|
||
return new LongValue(CompanyContext.get());
|
||
}
|
||
public String getTenantIdColumn() { return "company_id"; }
|
||
}
|
||
));
|
||
return interceptor;
|
||
}
|
||
```
|
||
|
||
### 3.4 Shiro 配置
|
||
|
||
#### TokenFilter(继承 AuthenticatingFilter)
|
||
|
||
```java
|
||
// com/label/common/shiro/TokenFilter.java
|
||
public class TokenFilter extends AuthenticatingFilter {
|
||
|
||
@Override
|
||
protected AuthenticationToken createToken(ServletRequest req, ServletResponse res) {
|
||
String token = extractToken((HttpServletRequest) req);
|
||
return new TokenAuthenticationToken(token);
|
||
}
|
||
|
||
@Override
|
||
protected boolean onAccessDenied(ServletRequest req, ServletResponse res) throws Exception {
|
||
String token = extractToken((HttpServletRequest) req);
|
||
if (token == null) {
|
||
sendUnauthorized(res, "缺少 Token");
|
||
return false;
|
||
}
|
||
return executeLogin(req, res);
|
||
}
|
||
|
||
@Override
|
||
protected boolean onLoginSuccess(AuthenticationToken token, Subject subject,
|
||
ServletRequest req, ServletResponse res) throws Exception {
|
||
UserSession session = (UserSession) subject.getPrincipal();
|
||
// 滑动过期:每次有效请求重置 TTL
|
||
long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
|
||
redisTemplate.expire(RedisKeyManager.tokenKey(session.getToken()), ttl, SECONDS);
|
||
// 注入租户上下文(在整个 Filter 链执行期间有效,由 doFilterInternal 的 finally 清理)
|
||
CompanyContext.set(session.getCompanyId());
|
||
return true; // 继续执行 Filter 链(进入 Controller)
|
||
}
|
||
|
||
// ⚠️ 必须在此处清理 ThreadLocal,而非 onLoginSuccess——
|
||
// onLoginSuccess 返回 true 后 Filter 链才继续执行,此时 Controller 尚未运行。
|
||
// doFilterInternal 的 finally 块保证请求结束后(无论正常还是异常)都清理上下文。
|
||
@Override
|
||
protected void doFilterInternal(ServletRequest req, ServletResponse res, FilterChain chain)
|
||
throws ServletException, IOException {
|
||
try {
|
||
super.doFilterInternal(req, res, chain);
|
||
} finally {
|
||
CompanyContext.clear();
|
||
}
|
||
}
|
||
|
||
private String extractToken(HttpServletRequest req) {
|
||
String header = req.getHeader("Authorization");
|
||
if (header != null && header.startsWith("Bearer ")) {
|
||
return header.substring(7);
|
||
}
|
||
return null;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### UserRealm(继承 AuthorizingRealm)
|
||
|
||
```java
|
||
// com/label/common/shiro/UserRealm.java
|
||
public class UserRealm extends AuthorizingRealm {
|
||
|
||
// 认证:从 Redis 验证 token:{uuid} 是否存在
|
||
@Override
|
||
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) {
|
||
String uuid = ((TokenAuthenticationToken) token).getToken();
|
||
Map<Object, Object> data = redisTemplate.opsForHash()
|
||
.entries(RedisKeyManager.tokenKey(uuid));
|
||
if (data.isEmpty()) throw new UnknownAccountException("Token 无效或已过期");
|
||
UserSession session = UserSession.from(uuid, data);
|
||
return new SimpleAuthenticationInfo(session, uuid, getName());
|
||
}
|
||
|
||
// 鉴权:先查 Redis user:perm:{userId},未命中查 PostgreSQL
|
||
@Override
|
||
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
|
||
UserSession session = (UserSession) principals.getPrimaryPrincipal();
|
||
String permKey = RedisKeyManager.userPermKey(session.getUserId());
|
||
String cachedRole = (String) redisTemplate.opsForValue().get(permKey);
|
||
if (cachedRole == null) {
|
||
SysUser user = sysUserMapper.selectById(session.getUserId());
|
||
cachedRole = user.getRole();
|
||
redisTemplate.opsForValue().set(permKey, cachedRole, 5, MINUTES);
|
||
}
|
||
SimpleAuthorizationInfo info = new SimpleAuthorizationInfo();
|
||
info.addRole(cachedRole);
|
||
// 角色继承:高级角色包含所有低级角色权限
|
||
addInheritedRoles(info, cachedRole);
|
||
return info;
|
||
}
|
||
|
||
private void addInheritedRoles(SimpleAuthorizationInfo info, String role) {
|
||
// ADMIN ⊃ REVIEWER ⊃ ANNOTATOR ⊃ UPLOADER
|
||
switch (role) {
|
||
case "ADMIN" -> info.addRoles(Set.of("REVIEWER", "ANNOTATOR", "UPLOADER"));
|
||
case "REVIEWER" -> info.addRoles(Set.of("ANNOTATOR", "UPLOADER"));
|
||
case "ANNOTATOR" -> info.addRoles(Set.of("UPLOADER"));
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### ShiroConfig 过滤器链
|
||
|
||
```java
|
||
@Bean
|
||
public ShiroFilterChainDefinition shiroFilterChainDefinition() {
|
||
DefaultShiroFilterChainDefinition chain = new DefaultShiroFilterChainDefinition();
|
||
chain.addPathDefinition("/api/auth/login", "anon");
|
||
chain.addPathDefinition("/api/**", "tokenFilter");
|
||
return chain;
|
||
}
|
||
```
|
||
|
||
权限声明使用注解,禁止在 Service 内使用 if-role 临时判断:
|
||
|
||
```java
|
||
@RequiresRoles("ADMIN")
|
||
public void createTask(...)
|
||
|
||
@RequiresRoles(value = {"ANNOTATOR", "REVIEWER", "ADMIN"}, logical = Logical.OR)
|
||
public void claimTask(...)
|
||
```
|
||
|
||
#### 角色变更立即驱逐缓存
|
||
|
||
```java
|
||
// UserService.updateRole() 内
|
||
@Transactional
|
||
public void updateRole(Long userId, String newRole) {
|
||
userMapper.updateRole(userId, newRole);
|
||
// 立即驱逐,不等 TTL 自然过期(延迟在禁用高权限账号时存在安全窗口)
|
||
redisTemplate.delete(RedisKeyManager.userPermKey(userId));
|
||
}
|
||
```
|
||
|
||
### 3.5 Redis Key 管理
|
||
|
||
```java
|
||
// com/label/common/redis/RedisKeyManager.java
|
||
public final class RedisKeyManager {
|
||
public static String tokenKey(String uuid) { return "token:" + uuid; }
|
||
public static String userPermKey(Long userId) { return "user:perm:" + userId; }
|
||
public static String taskClaimKey(Long taskId){ return "task:claim:" + taskId; }
|
||
}
|
||
```
|
||
|
||
禁止在上述三类命名空间之外自造 Key 用于认证、权限或锁目的。
|
||
|
||
### 3.6 AOP 审计日志切面
|
||
|
||
```java
|
||
// com/label/common/aop/OperationLog.java(注解)
|
||
@Target(ElementType.METHOD)
|
||
@Retention(RetentionPolicy.RUNTIME)
|
||
public @interface OperationLog {
|
||
String type(); // 对应 operation_type 枚举值,如 "TASK_CLAIM"
|
||
String targetType() default "";
|
||
}
|
||
|
||
// com/label/common/aop/AuditAspect.java
|
||
@Aspect
|
||
@Component
|
||
public class AuditAspect {
|
||
|
||
@Around("@annotation(operationLog)")
|
||
public Object audit(ProceedingJoinPoint pjp, OperationLog operationLog) throws Throwable {
|
||
String result = "SUCCESS";
|
||
String errorMsg = null;
|
||
Object returnVal = null;
|
||
try {
|
||
returnVal = pjp.proceed();
|
||
} catch (Throwable ex) {
|
||
result = "FAIL";
|
||
errorMsg = ex.getMessage();
|
||
throw ex;
|
||
} finally {
|
||
// 业务事务已 commit/rollback 后,以独立操作写入审计记录
|
||
// 审计写入失败只记录 error 日志,禁止回滚业务事务
|
||
try {
|
||
saveAuditLog(operationLog.type(), operationLog.targetType(), result, errorMsg);
|
||
} catch (Exception auditEx) {
|
||
log.error("审计日志写入失败: type={}", operationLog.type(), auditEx);
|
||
}
|
||
}
|
||
return returnVal;
|
||
}
|
||
|
||
private void saveAuditLog(String type, String targetType, String result, String errorMsg) {
|
||
UserSession session = getCurrentSession();
|
||
SysOperationLog log = SysOperationLog.builder()
|
||
.companyId(CompanyContext.get())
|
||
.operatorId(session != null ? session.getUserId() : null)
|
||
.operatorName(session != null ? session.getUsername() : "unknown")
|
||
.operationType(type)
|
||
.targetType(targetType.isEmpty() ? null : targetType)
|
||
.result(result)
|
||
.errorMessage(errorMsg)
|
||
.ipAddress(getClientIp())
|
||
.build();
|
||
operationLogMapper.insert(log);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.7 RustFS S3 客户端
|
||
|
||
RustFS 实现 S3 兼容接口,使用 AWS SDK for Java v2 连接:
|
||
|
||
```java
|
||
// com/label/common/storage/RustFsClient.java
|
||
@Component
|
||
public class RustFsClient {
|
||
|
||
private final S3Client s3Client; // 配置 endpoint 指向 RustFS 地址
|
||
|
||
public String upload(String bucket, String key, InputStream data, long size) {
|
||
PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).build();
|
||
s3Client.putObject(req, RequestBody.fromInputStream(data, size));
|
||
return key;
|
||
}
|
||
|
||
public InputStream download(String bucket, String key) {
|
||
GetObjectRequest req = GetObjectRequest.builder().bucket(bucket).key(key).build();
|
||
return s3Client.getObject(req);
|
||
}
|
||
|
||
public void delete(String bucket, String key) {
|
||
s3Client.deleteObject(b -> b.bucket(bucket).key(key));
|
||
}
|
||
|
||
public String getPresignedUrl(String bucket, String key, Duration expiry) {
|
||
S3Presigner presigner = S3Presigner.builder().endpointOverride(endpointUri).build();
|
||
return presigner.presignGetObject(b -> b.signatureDuration(expiry)
|
||
.getObjectRequest(r -> r.bucket(bucket).key(key))).url().toString();
|
||
}
|
||
}
|
||
```
|
||
|
||
**对象存储路径规范:**
|
||
|
||
| 资源类型 | 存储桶 | 路径格式 |
|
||
|----------|--------|----------|
|
||
| 上传文本文件 | `source-data` | `text/{yyyyMM}/{source_id}.txt` |
|
||
| 上传图片 | `source-data` | `image/{yyyyMM}/{source_id}.jpg` |
|
||
| 上传视频 | `source-data` | `video/{yyyyMM}/{source_id}.mp4` |
|
||
| 视频帧图 | `source-data` | `frames/{source_id}/{frame_index}.jpg` |
|
||
| 视频片段转译文本 | `source-data` | `video-text/{parent_source_id}/{timestamp}.txt` |
|
||
| bbox 裁剪图 | `source-data` | `crops/{task_id}/{item_index}.jpg` |
|
||
| 导出 JSONL | `finetune-export` | `export/{batchUuid}.jsonl` |
|
||
|
||
### 3.8 AI 服务 HTTP 客户端
|
||
|
||
Spring Boot 3 使用 `RestClient` 封装对 Python FastAPI 服务的同步 HTTP 调用:
|
||
|
||
```java
|
||
// com/label/common/ai/AiServiceClient.java
|
||
@Component
|
||
public class AiServiceClient {
|
||
|
||
private final RestClient restClient;
|
||
|
||
public AiServiceClient(@Value("${ai.service.base-url}") String baseUrl) {
|
||
this.restClient = RestClient.builder().baseUrl(baseUrl).build();
|
||
}
|
||
|
||
// POST /api/v1/text/extract
|
||
public TextExtractResponse extractText(String filePath, String model) {
|
||
return restClient.post().uri("/api/v1/text/extract")
|
||
.body(Map.of("file_path", filePath, "model", model))
|
||
.retrieve().body(TextExtractResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/image/extract
|
||
public ImageExtractResponse extractImage(String imagePath, String model) {
|
||
return restClient.post().uri("/api/v1/image/extract")
|
||
.body(Map.of("image_path", imagePath, "model", model))
|
||
.retrieve().body(ImageExtractResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/video/extract-frames
|
||
public VideoFramesResponse extractFrames(String videoPath, String frameInterval, String mode) {
|
||
return restClient.post().uri("/api/v1/video/extract-frames")
|
||
.body(Map.of("video_path", videoPath, "frame_interval", frameInterval, "mode", mode))
|
||
.retrieve().body(VideoFramesResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/video/to-text
|
||
public VideoToTextResponse videoToText(String videoPath, int startSec, int endSec, String model) {
|
||
return restClient.post().uri("/api/v1/video/to-text")
|
||
.body(Map.of("video_path", videoPath, "start_sec", startSec, "end_sec", endSec, "model", model))
|
||
.retrieve().body(VideoToTextResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/qa/gen-text
|
||
public QaGenResponse genTextQa(List<?> triples, String model, String promptTemplate) {
|
||
return restClient.post().uri("/api/v1/qa/gen-text")
|
||
.body(Map.of("items", triples, "model", model, "prompt_template", promptTemplate))
|
||
.retrieve().body(QaGenResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/qa/gen-image
|
||
public QaGenResponse genImageQa(List<?> quadruples, String model, String promptTemplate) {
|
||
return restClient.post().uri("/api/v1/qa/gen-image")
|
||
.body(Map.of("items", quadruples, "model", model, "prompt_template", promptTemplate))
|
||
.retrieve().body(QaGenResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/finetune/start
|
||
public FinetuneStartResponse startFinetune(String jsonlUrl, String baseModel, Map<String, Object> params) {
|
||
return restClient.post().uri("/api/v1/finetune/start")
|
||
.body(Map.of("jsonl_url", jsonlUrl, "base_model", baseModel, "params", params))
|
||
.retrieve().body(FinetuneStartResponse.class);
|
||
}
|
||
|
||
// GET /api/v1/finetune/status/{jobId}
|
||
public FinetuneStatusResponse getFinetuneStatus(String jobId) {
|
||
return restClient.get().uri("/api/v1/finetune/status/{jobId}", jobId)
|
||
.retrieve().body(FinetuneStatusResponse.class);
|
||
}
|
||
}
|
||
```
|
||
|
||
[↑ 返回目录](#目录)
|
||
|
||
---
|
||
|
||
## 四、业务模块纵切
|
||
|
||
### 4.1 用户与权限模块
|
||
|
||
**Entity(SysUser)**:字段同 DDL,`passwordHash` 字段加 `@JsonIgnore` 禁止序列化到响应。
|
||
|
||
**AuthService 核心逻辑:**
|
||
|
||
```java
|
||
// 登录
|
||
@OperationLog(type = "USER_LOGIN")
|
||
public String login(Long companyId, String username, String password, String ip) {
|
||
SysUser user = userMapper.selectByCompanyAndUsername(companyId, username);
|
||
if (user == null || !BCrypt.checkpw(password, user.getPasswordHash()))
|
||
throw new BusinessException("USER_NOT_FOUND", "用户名或密码错误");
|
||
if ("DISABLED".equals(user.getStatus()))
|
||
throw new BusinessException("USER_DISABLED", "账号已禁用");
|
||
|
||
String token = UUID.randomUUID().toString();
|
||
Map<String, Object> tokenData = Map.of(
|
||
"userId", user.getId(), "role", user.getRole(), "companyId", user.getCompanyId(),
|
||
"username", user.getUsername()
|
||
);
|
||
long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
|
||
redisTemplate.opsForHash().putAll(RedisKeyManager.tokenKey(token), tokenData);
|
||
redisTemplate.expire(RedisKeyManager.tokenKey(token), ttl, SECONDS);
|
||
return token;
|
||
}
|
||
|
||
// 退出登录
|
||
@OperationLog(type = "USER_LOGOUT")
|
||
public void logout(String token) {
|
||
redisTemplate.delete(RedisKeyManager.tokenKey(token));
|
||
}
|
||
```
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| POST | `/api/auth/login` | 匿名 | 返回 Token(UUID) |
|
||
| POST | `/api/auth/logout` | 已登录 | 删除 Redis Token |
|
||
| GET | `/api/auth/me` | 已登录 | 当前用户信息与角色 |
|
||
| GET | `/api/users` | ADMIN | 分页查询用户列表 |
|
||
| POST | `/api/users` | ADMIN | 创建用户 |
|
||
| PUT | `/api/users/{id}` | ADMIN | 更新用户信息 |
|
||
| PUT | `/api/users/{id}/status` | ADMIN | 启用/禁用账号(同步驱逐权限缓存) |
|
||
| PUT | `/api/users/{id}/role` | ADMIN | 变更角色(同步驱逐权限缓存) |
|
||
|
||
---
|
||
|
||
### 4.2 资料管理模块
|
||
|
||
**SourceService 核心逻辑:**
|
||
|
||
```java
|
||
@OperationLog(type = "SOURCE_UPLOAD")
|
||
@Transactional
|
||
public SourceData upload(MultipartFile file, String dataType) {
|
||
Long companyId = CompanyContext.get();
|
||
// 1. 先创建 source_data 记录获取 ID(用于构造存储路径)
|
||
SourceData sd = SourceData.builder()
|
||
.companyId(companyId).uploaderId(getCurrentUserId())
|
||
.dataType(dataType).fileName(file.getOriginalFilename())
|
||
.fileSize(file.getSize()).bucketName("source-data")
|
||
.status("PENDING").build();
|
||
sourceDataMapper.insert(sd);
|
||
|
||
// 2. 构造路径并上传至 RustFS
|
||
String path = buildPath(dataType, sd.getId());
|
||
rustFsClient.upload("source-data", path, file.getInputStream(), file.getSize());
|
||
|
||
// 3. 更新 file_path
|
||
sd.setFilePath(path);
|
||
sourceDataMapper.updateById(sd);
|
||
return sd;
|
||
}
|
||
|
||
private String buildPath(String dataType, Long sourceId) {
|
||
String ym = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
|
||
return switch (dataType) {
|
||
case "TEXT" -> "text/" + ym + "/" + sourceId + ".txt";
|
||
case "IMAGE" -> "image/" + ym + "/" + sourceId + ".jpg";
|
||
case "VIDEO" -> "video/" + ym + "/" + sourceId + ".mp4";
|
||
default -> throw new BusinessException("INVALID_TYPE", "不支持的资料类型");
|
||
};
|
||
}
|
||
```
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| POST | `/api/source/upload` | UPLOADER | 上传文件,创建 source_data 记录 |
|
||
| GET | `/api/source/list` | UPLOADER | 分页查询(UPLOADER 只见自己,ADMIN 见全部) |
|
||
| GET | `/api/source/{id}` | UPLOADER | 查看资料详情 |
|
||
| DELETE | `/api/source/{id}` | ADMIN | 删除资料及 RustFS 文件 |
|
||
|
||
---
|
||
|
||
### 4.3 任务管理模块(含并发控制)
|
||
|
||
**TaskClaimService.claim() — 双重保障:**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "TASK_CLAIM")
|
||
public void claim(Long taskId) {
|
||
Long userId = getCurrentUserId();
|
||
// 第一重:Redis 分布式锁(TTL 30s,SET NX)
|
||
Boolean locked = redisTemplate.opsForValue()
|
||
.setIfAbsent(RedisKeyManager.taskClaimKey(taskId), userId.toString(), 30, SECONDS);
|
||
if (!Boolean.TRUE.equals(locked))
|
||
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");
|
||
|
||
// 第二重:数据库乐观约束(WHERE status = 'UNCLAIMED')
|
||
// UPDATE annotation_task SET status='IN_PROGRESS', claimed_by=?, claimed_at=NOW()
|
||
// WHERE id=? AND status='UNCLAIMED' AND company_id=?
|
||
int affected = taskMapper.claimTask(taskId, userId, CompanyContext.get());
|
||
if (affected == 0)
|
||
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");
|
||
|
||
// 写入任务历史(同一事务)
|
||
insertHistory(taskId, "UNCLAIMED", "IN_PROGRESS", userId, null);
|
||
}
|
||
|
||
public void unclaim(Long taskId) {
|
||
StateValidator.assertTransition(TaskStatus.IN_PROGRESS, TaskStatus.UNCLAIMED, TaskStatus.TRANSITIONS);
|
||
// 更新 claimed_by = NULL, status = UNCLAIMED
|
||
taskMapper.unclaim(taskId, getCurrentUserId(), CompanyContext.get());
|
||
redisTemplate.delete(RedisKeyManager.taskClaimKey(taskId));
|
||
insertHistory(taskId, "IN_PROGRESS", "UNCLAIMED", getCurrentUserId(), null);
|
||
}
|
||
```
|
||
|
||
**每次 status 变更必须在同一事务中调用 `insertHistory()` 写入 `annotation_task_history`。**
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| POST | `/api/tasks` | ADMIN | 为指定 source 创建 EXTRACTION 任务 |
|
||
| GET | `/api/tasks/pool` | ANNOTATOR | 查看可领取任务池(UNCLAIMED 状态)。ANNOTATOR 只看到 EXTRACTION 类型;REVIEWER 只看到 SUBMITTED 状态(即审批队列,与 pending-review 等价);两者均分页,不可无界查询 |
|
||
| POST | `/api/tasks/{id}/claim` | ANNOTATOR | 领取任务(争抢式,Redis SET NX + DB 乐观锁) |
|
||
| POST | `/api/tasks/{id}/unclaim` | ANNOTATOR | 放弃任务,退回任务池 |
|
||
| GET | `/api/tasks/mine` | ANNOTATOR | 查询我领取的任务列表(包含 IN_PROGRESS、SUBMITTED、REJECTED 状态,分页) |
|
||
| POST | `/api/tasks/{id}/reclaim` | ANNOTATOR | 重领被驳回的任务(task.status 必须为 REJECTED 且 claimedBy = 当前用户),状态流转 REJECTED → IN_PROGRESS |
|
||
| GET | `/api/tasks/pending-review` | REVIEWER | 查看待我审批的任务列表(status = SUBMITTED,分页);REVIEWER 的专属审批入口 |
|
||
| GET | `/api/tasks/{id}` | ANNOTATOR | 查看任务详情 |
|
||
| GET | `/api/tasks` | ADMIN | 查询全部任务(支持过滤,分页) |
|
||
| PUT | `/api/tasks/{id}/reassign` | ADMIN | 强制转移任务归属 |
|
||
|
||
---
|
||
|
||
### 4.4 标注工作台模块(EXTRACTION 阶段)
|
||
|
||
**ExtractionService 核心逻辑:**
|
||
|
||
```java
|
||
// AI 辅助预标注(标注员领取任务后调用)
|
||
public AnnotationResult aiPreAnnotate(Long taskId) {
|
||
AnnotationTask task = taskMapper.selectById(taskId);
|
||
SourceData source = sourceDataMapper.selectById(task.getSourceId());
|
||
Object aiResult = "IMAGE".equals(source.getDataType()) || task.getVideoUnitType() != null
|
||
? aiServiceClient.extractImage(resolveImagePath(task), task.getAiModel())
|
||
: aiServiceClient.extractText(source.getFilePath(), task.getAiModel());
|
||
AnnotationResult result = buildAnnotationResult(task, aiResult);
|
||
annotationResultMapper.insertOrReplace(result);
|
||
return result;
|
||
}
|
||
|
||
// 更新提取结果(整体覆盖,PUT 语义)
|
||
public void updateResult(Long taskId, String resultJsonStr) {
|
||
// 验证 JSON 格式合法性
|
||
validateResultJson(resultJsonStr);
|
||
// 整体替换 result_json,禁止局部 PATCH 或逐条追加
|
||
annotationResultMapper.updateResultJson(taskId, resultJsonStr, CompanyContext.get());
|
||
}
|
||
|
||
// 审批通过——两阶段:事务内完成同步步骤,事务提交后异步触发 QA 生成
|
||
@Transactional
|
||
@OperationLog(type = "EXTRACTION_APPROVE")
|
||
public void approve(Long taskId) {
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
|
||
// 自审校验:提交者不能审批自己的任务
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_FORBIDDEN", "不允许审批自己提交的任务");
|
||
|
||
AnnotationResult result = annotationResultMapper.selectByTaskId(taskId);
|
||
|
||
// 1. annotation_result.is_final = true
|
||
result.setIsFinal(true);
|
||
annotationResultMapper.updateById(result);
|
||
|
||
// 2. annotation_task.status → APPROVED
|
||
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.APPROVED, TaskStatus.TRANSITIONS);
|
||
task.setStatus("APPROVED");
|
||
task.setCompletedAt(LocalDateTime.now());
|
||
taskMapper.updateById(task);
|
||
|
||
// 3. 写入任务历史
|
||
insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);
|
||
|
||
// 4. 发布领域事件,事务提交后异步执行 QA 生成(步骤 5-7)
|
||
// 注:AI HTTP 调用禁止在 @Transactional 内同步执行——会占用数据库连接直至 AI 响应,
|
||
// 且 AI 失败会错误地回滚已完成的审批。
|
||
// 使用 @TransactionalEventListener(phase = AFTER_COMMIT) 保证先提交再触发。
|
||
eventPublisher.publishEvent(new ExtractionApprovedEvent(taskId, task.getSourceId(),
|
||
getSourceType(task), CompanyContext.get()));
|
||
}
|
||
|
||
// 驳回——状态回退,标注员可重领
|
||
@Transactional
|
||
@OperationLog(type = "EXTRACTION_REJECT")
|
||
public void reject(Long taskId, String reason) {
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
|
||
// 自审校验
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_FORBIDDEN", "不允许驳回自己提交的任务");
|
||
|
||
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS);
|
||
task.setStatus("REJECTED");
|
||
taskMapper.updateById(task);
|
||
insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason);
|
||
// source_data.status 保持 EXTRACTING 不变,待标注员重新提交后再推进
|
||
}
|
||
|
||
// ExtractionApprovedEventListener(@TransactionalEventListener,独立事务)
|
||
// 负责 5-7 步:AI 调用 → 写 training_dataset → 创建 QA 任务 → 更新 source_data
|
||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
|
||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||
public void onExtractionApproved(ExtractionApprovedEvent event) {
|
||
AnnotationTask task = taskMapper.selectById(event.getTaskId());
|
||
AnnotationResult result = annotationResultMapper.selectByTaskId(event.getTaskId());
|
||
|
||
// 5. 调用 AI 生成候选问答对(在事务外执行,失败不影响审批结果)
|
||
String promptKey = "IMAGE".equals(event.getSourceType()) ? "prompt_qa_gen_image" : "prompt_qa_gen_text";
|
||
String promptTemplate = sysConfigService.get(promptKey);
|
||
QaGenResponse qaResponse = generateQa(task, result, promptTemplate);
|
||
|
||
// 6. 将候选问答对写入 training_dataset(PENDING_REVIEW)
|
||
List<TrainingDataset> samples = buildTrainingSamples(task, result, qaResponse);
|
||
trainingDatasetMapper.batchInsert(samples);
|
||
|
||
// 7. 创建 QA_GENERATION 阶段任务(UNCLAIMED)
|
||
AnnotationTask qaTask = buildQaTask(task);
|
||
taskMapper.insert(qaTask);
|
||
insertHistory(qaTask.getId(), null, "UNCLAIMED", task.getClaimedBy(), null);
|
||
|
||
// 8. source_data.status → QA_REVIEW
|
||
sourceDataMapper.updateStatus(event.getSourceId(), "QA_REVIEW", event.getCompanyId());
|
||
}
|
||
```
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| GET | `/api/extraction/{taskId}` | ANNOTATOR | 获取当前提取结果(含 AI 预标注) |
|
||
| PUT | `/api/extraction/{taskId}` | ANNOTATOR | 更新提取结果(整体 JSONB 覆盖) |
|
||
| POST | `/api/extraction/{taskId}/submit` | ANNOTATOR | 提交提取结果,进入审批队列 |
|
||
| POST | `/api/extraction/{taskId}/approve` | REVIEWER | 审批通过,自动触发 QA 任务创建 |
|
||
| POST | `/api/extraction/{taskId}/reject` | REVIEWER | 驳回,附驳回原因 |
|
||
|
||
---
|
||
|
||
### 4.5 问答生成模块(QA_GENERATION 阶段)
|
||
|
||
`QaService` 的整体覆盖逻辑与 `ExtractionService` 一致(PUT 语义,禁止局部 PATCH)。
|
||
|
||
**approve 级联动作(同一事务):**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "QA_APPROVE")
|
||
public void approve(Long taskId) {
|
||
// 1. 先校验任务合法性(必须在任何 DB 写入之前执行,避免校验失败时数据已被修改)
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
|
||
// 自审校验:提交者不能审批自己的任务
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_FORBIDDEN", "不允许审批自己提交的任务");
|
||
|
||
// 2. training_dataset.status → APPROVED
|
||
trainingDatasetMapper.approveByTaskId(taskId, getCurrentUserId(), CompanyContext.get());
|
||
|
||
// 3. annotation_task.status → APPROVED
|
||
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.APPROVED, TaskStatus.TRANSITIONS);
|
||
task.setStatus("APPROVED");
|
||
task.setCompletedAt(LocalDateTime.now());
|
||
taskMapper.updateById(task);
|
||
|
||
// 4. source_data.status → APPROVED(整条流水线完成)
|
||
sourceDataMapper.updateStatus(task.getSourceId(), "APPROVED", CompanyContext.get());
|
||
|
||
// 5. 写入任务历史
|
||
insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);
|
||
}
|
||
|
||
// 驳回问答对——任务退回 IN_PROGRESS,training_dataset 删除候选记录
|
||
@Transactional
|
||
@OperationLog(type = "QA_REJECT")
|
||
public void reject(Long taskId, String reason) {
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
|
||
// 自审校验
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_FORBIDDEN", "不允许驳回自己提交的任务");
|
||
|
||
// 删除本次生成的候选问答对(PENDING_REVIEW 状态),待标注员修改后重新提交
|
||
trainingDatasetMapper.deleteByTaskId(taskId, CompanyContext.get());
|
||
|
||
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS);
|
||
task.setStatus("REJECTED");
|
||
taskMapper.updateById(task);
|
||
insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason);
|
||
// source_data.status 保持 QA_REVIEW 不变
|
||
}
|
||
```
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| GET | `/api/qa/{taskId}` | ANNOTATOR | 获取候选问答对列表 |
|
||
| PUT | `/api/qa/{taskId}` | ANNOTATOR | 修改问答对(整体覆盖) |
|
||
| POST | `/api/qa/{taskId}/submit` | ANNOTATOR | 提交问答对,进入审批队列 |
|
||
| POST | `/api/qa/{taskId}/approve` | REVIEWER | 审批通过,写入 training_dataset |
|
||
| POST | `/api/qa/{taskId}/reject` | REVIEWER | 驳回,附驳回原因 |
|
||
|
||
---
|
||
|
||
### 4.6 训练数据导出模块
|
||
|
||
**ExportService.createBatch() 核心逻辑:**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "EXPORT_CREATE")
|
||
public ExportBatch createBatch(List<Long> sampleIds) {
|
||
Long companyId = CompanyContext.get();
|
||
// 1. 校验样本均为 APPROVED 状态
|
||
List<TrainingDataset> samples = trainingDatasetMapper
|
||
.selectByIdsAndStatus(sampleIds, "APPROVED", companyId);
|
||
if (samples.size() != sampleIds.size())
|
||
throw new BusinessException("INVALID_SAMPLES", "部分样本不处于 APPROVED 状态");
|
||
|
||
// 2. 生成 JSONL 内容(每行一个 glm_format_json)
|
||
String batchUuid = UUID.randomUUID().toString();
|
||
String jsonl = samples.stream()
|
||
.map(s -> s.getGlmFormatJson().toString())
|
||
.collect(Collectors.joining("\n"));
|
||
|
||
// 3. 上传至 RustFS:finetune-export/export/{batchUuid}.jsonl
|
||
String path = "export/" + batchUuid + ".jsonl";
|
||
byte[] bytes = jsonl.getBytes(StandardCharsets.UTF_8);
|
||
rustFsClient.upload("finetune-export", path, new ByteArrayInputStream(bytes), bytes.length);
|
||
|
||
// 4. 更新样本 export_batch_id 与 exported_at
|
||
trainingDatasetMapper.batchUpdateExportInfo(sampleIds, batchUuid, LocalDateTime.now());
|
||
|
||
// 5. 写入 export_batch 记录
|
||
ExportBatch batch = ExportBatch.builder()
|
||
.companyId(companyId).batchUuid(batchUuid).datasetFilePath(path)
|
||
.sampleCount(samples.size()).finetuneStatus("NOT_STARTED")
|
||
.createdBy(getCurrentUserId()).build();
|
||
exportBatchMapper.insert(batch);
|
||
return batch;
|
||
}
|
||
```
|
||
|
||
**FinetuneService.trigger():** 调用 `aiServiceClient.startFinetune(...)` 获取 `glmJobId`,更新 `export_batch.glm_job_id` 和 `finetune_status = RUNNING`。
|
||
|
||
**接口清单(全部需要 ADMIN 权限):**
|
||
|
||
| 方法 | 路径 | 说明 |
|
||
|------|------|------|
|
||
| GET | `/api/training/samples` | 分页查询已审批通过、待导出的样本 |
|
||
| POST | `/api/export/batch` | 创建导出批次,合并 JSONL 并上传 RustFS |
|
||
| POST | `/api/export/{batchId}/finetune` | 向 GLM 工厂提交微调任务 |
|
||
| GET | `/api/export/{batchId}/status` | 查询微调任务状态 |
|
||
| GET | `/api/export/list` | 分页查询所有导出批次 |
|
||
|
||
---
|
||
|
||
### 4.7 系统配置模块
|
||
|
||
**SysConfigService.get(configKey):** 先按 `(companyId, configKey)` 查,未命中则按 `(NULL, configKey)` 查全局默认值,公司级配置优先覆盖全局值。
|
||
|
||
```java
|
||
public String get(String configKey) {
|
||
Long companyId = CompanyContext.get();
|
||
// 先查公司级配置
|
||
SysConfig cfg = configMapper.selectByCompanyAndKey(companyId, configKey);
|
||
if (cfg == null) {
|
||
// 回退到全局默认(company_id IS NULL)
|
||
cfg = configMapper.selectByCompanyAndKey(null, configKey);
|
||
}
|
||
return cfg != null ? cfg.getConfigValue() : null;
|
||
}
|
||
```
|
||
|
||
**接口清单(全部需要 ADMIN 权限):**
|
||
|
||
| 方法 | 路径 | 说明 |
|
||
|------|------|------|
|
||
| GET | `/api/config` | 获取所有配置项(公司级 + 全局默认) |
|
||
| PUT | `/api/config/{key}` | 更新单项配置(含 Prompt 模板) |
|
||
|
||
---
|
||
|
||
### 4.8 视频处理模块
|
||
|
||
**VideoProcessService 核心逻辑:**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "TASK_CREATE")
|
||
public VideoProcessJob createJob(Long sourceId, String jobType, JsonNode params) {
|
||
Long companyId = CompanyContext.get();
|
||
// 1. source_data.status → PREPROCESSING
|
||
sourceDataMapper.updateStatus(sourceId, "PREPROCESSING", companyId);
|
||
// 2. 创建 video_process_job(PENDING)
|
||
VideoProcessJob job = VideoProcessJob.builder()
|
||
.companyId(companyId).sourceId(sourceId)
|
||
.jobType(jobType).params(params).status("PENDING").build();
|
||
videoProcessJobMapper.insert(job);
|
||
// 3. 异步调用 AI 服务(非阻塞,由 AI 服务自行回调)
|
||
triggerAiAsync(job);
|
||
return job;
|
||
}
|
||
|
||
// AI 服务回调:幂等处理
|
||
@Transactional
|
||
public void handleCallback(Long jobId, boolean success, String outputPath, String errorMsg) {
|
||
VideoProcessJob job = videoProcessJobMapper.selectById(jobId);
|
||
// 幂等:已是 SUCCESS 状态则静默忽略重复回调,不得重复创建 annotation_task
|
||
if ("SUCCESS".equals(job.getStatus())) return;
|
||
|
||
if (success) {
|
||
job.setStatus("SUCCESS");
|
||
job.setOutputPath(outputPath);
|
||
job.setCompletedAt(LocalDateTime.now());
|
||
// source_data.status → PENDING(进入后续标注流程)
|
||
sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
|
||
} else {
|
||
if (job.getRetryCount() >= job.getMaxRetries()) {
|
||
// 达最大重试次数,置 FAILED,需 ADMIN 手动重置为 PENDING 后才可重新触发
|
||
job.setStatus("FAILED");
|
||
job.setErrorMessage(errorMsg);
|
||
sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
|
||
} else {
|
||
job.setStatus("RETRYING");
|
||
job.setRetryCount(job.getRetryCount() + 1);
|
||
job.setErrorMessage(errorMsg);
|
||
}
|
||
}
|
||
videoProcessJobMapper.updateById(job);
|
||
}
|
||
```
|
||
|
||
[↑ 返回目录](#目录)
|
||
|
||
---
|
||
|
||
## 五、状态机实现规范
|
||
|
||
所有状态变更**必须**经过 `StateValidator.assertTransition()` 校验,禁止绕过直接调用 Mapper 更新状态字段。
|
||
|
||
### 5.1 StateValidator
|
||
|
||
```java
|
||
// com/label/common/statemachine/StateValidator.java
|
||
public final class StateValidator {
|
||
public static <S> void assertTransition(S from, S to, Map<S, Set<S>> transitions) {
|
||
Set<S> allowed = transitions.getOrDefault(from, Set.of());
|
||
if (!allowed.contains(to))
|
||
throw new BusinessException("INVALID_STATE_TRANSITION",
|
||
"非法状态转换: " + from + " → " + to);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5.2 source_data 状态机
|
||
|
||
```java
|
||
public enum SourceStatus {
|
||
PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED;
|
||
// 注:source_data 无 REJECTED 状态。QA 阶段驳回的是 annotation_task(→ REJECTED),
|
||
// 不改变 source_data.status(保持 QA_REVIEW);重新提交后 source_data 随任务推进。
|
||
|
||
public static final Map<SourceStatus, Set<SourceStatus>> TRANSITIONS = Map.of(
|
||
PENDING, Set.of(EXTRACTING, PREPROCESSING),
|
||
PREPROCESSING, Set.of(PENDING),
|
||
EXTRACTING, Set.of(QA_REVIEW),
|
||
QA_REVIEW, Set.of(APPROVED)
|
||
);
|
||
}
|
||
```
|
||
|
||
### 5.3 annotation_task 状态机
|
||
|
||
```java
|
||
public enum TaskStatus {
|
||
UNCLAIMED, IN_PROGRESS, SUBMITTED, APPROVED, REJECTED;
|
||
|
||
public static final Map<TaskStatus, Set<TaskStatus>> TRANSITIONS = Map.of(
|
||
UNCLAIMED, Set.of(IN_PROGRESS),
|
||
IN_PROGRESS, Set.of(SUBMITTED, UNCLAIMED, IN_PROGRESS),
|
||
// IN_PROGRESS → IN_PROGRESS 用于 ADMIN 强制转移(持有人变更,状态不变)
|
||
SUBMITTED, Set.of(APPROVED, REJECTED),
|
||
REJECTED, Set.of(IN_PROGRESS) // 驳回后重拾
|
||
);
|
||
}
|
||
```
|
||
|
||
### 5.4 training_dataset 状态机
|
||
|
||
```java
|
||
public enum DatasetStatus {
|
||
PENDING_REVIEW, APPROVED, REJECTED;
|
||
|
||
public static final Map<DatasetStatus, Set<DatasetStatus>> TRANSITIONS = Map.of(
|
||
PENDING_REVIEW, Set.of(APPROVED, REJECTED),
|
||
REJECTED, Set.of(PENDING_REVIEW) // 驳回后可修改重提
|
||
);
|
||
}
|
||
```
|
||
|
||
### 5.5 video_process_job 状态机
|
||
|
||
```java
|
||
public enum VideoJobStatus {
|
||
PENDING, RUNNING, SUCCESS, FAILED, RETRYING;
|
||
|
||
public static final Map<VideoJobStatus, Set<VideoJobStatus>> TRANSITIONS = Map.of(
|
||
PENDING, Set.of(RUNNING),
|
||
RUNNING, Set.of(SUCCESS, RETRYING, FAILED),
|
||
RETRYING, Set.of(RUNNING, FAILED)
|
||
// FAILED → PENDING 由 ADMIN 手动触发接口,不在此处声明自动流转
|
||
);
|
||
}
|
||
```
|
||
|
||
[↑ 返回目录](#目录)
|
||
|
||
---
|
||
|
||
## 六、Docker Compose 配置
|
||
|
||
```yaml
|
||
# docker-compose.yml
|
||
version: "3.9"
|
||
|
||
services:
|
||
postgres:
|
||
image: postgres:16-alpine
|
||
environment:
|
||
POSTGRES_DB: label_db
|
||
POSTGRES_USER: label
|
||
POSTGRES_PASSWORD: label_password
|
||
volumes:
|
||
- postgres_data:/var/lib/postgresql/data
|
||
- ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
|
||
ports:
|
||
- "5432:5432"
|
||
healthcheck:
|
||
test: ["CMD-SHELL", "pg_isready -U label -d label_db"]
|
||
interval: 10s
|
||
timeout: 5s
|
||
retries: 5
|
||
|
||
redis:
|
||
image: redis:7-alpine
|
||
command: redis-server --requirepass redis_password
|
||
volumes:
|
||
- redis_data:/data
|
||
ports:
|
||
- "6379:6379"
|
||
healthcheck:
|
||
test: ["CMD", "redis-cli", "-a", "redis_password", "ping"]
|
||
interval: 10s
|
||
timeout: 5s
|
||
retries: 5
|
||
|
||
rustfs:
|
||
image: rustfs/rustfs:latest # 替换为生产可用的实际镜像
|
||
environment:
|
||
RUSTFS_ACCESS_KEY: minioadmin
|
||
RUSTFS_SECRET_KEY: minioadmin
|
||
volumes:
|
||
- rustfs_data:/data
|
||
ports:
|
||
- "9000:9000" # S3 API 端口
|
||
- "9001:9001" # Web 控制台端口
|
||
|
||
backend:
|
||
build:
|
||
context: .
|
||
dockerfile: Dockerfile
|
||
environment:
|
||
SPRING_DATASOURCE_URL: jdbc:postgresql://postgres:5432/label_db
|
||
SPRING_DATASOURCE_USERNAME: label
|
||
SPRING_DATASOURCE_PASSWORD: label_password
|
||
SPRING_REDIS_HOST: redis
|
||
SPRING_REDIS_PORT: 6379
|
||
SPRING_REDIS_PASSWORD: redis_password
|
||
RUSTFS_ENDPOINT: http://rustfs:9000
|
||
RUSTFS_ACCESS_KEY: minioadmin
|
||
RUSTFS_SECRET_KEY: minioadmin
|
||
AI_SERVICE_BASE_URL: http://ai-service:8000
|
||
ports:
|
||
- "8080:8080"
|
||
depends_on:
|
||
postgres:
|
||
condition: service_healthy
|
||
redis:
|
||
condition: service_healthy
|
||
rustfs:
|
||
condition: service_started
|
||
healthcheck:
|
||
test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
|
||
interval: 15s
|
||
retries: 5
|
||
|
||
ai-service:
|
||
build:
|
||
context: ../ai_service # Python FastAPI 服务所在目录
|
||
dockerfile: Dockerfile
|
||
environment:
|
||
RUSTFS_ENDPOINT: http://rustfs:9000
|
||
RUSTFS_ACCESS_KEY: minioadmin
|
||
RUSTFS_SECRET_KEY: minioadmin
|
||
ports:
|
||
- "8000:8000"
|
||
depends_on:
|
||
- rustfs
|
||
|
||
frontend:
|
||
image: nginx:alpine
|
||
volumes:
|
||
- ../frontend/dist:/usr/share/nginx/html:ro
|
||
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
|
||
ports:
|
||
- "80:80"
|
||
depends_on:
|
||
backend:
|
||
condition: service_healthy
|
||
|
||
volumes:
|
||
postgres_data:
|
||
redis_data:
|
||
rustfs_data:
|
||
```
|
||
|
||
**Dockerfile(backend):**
|
||
|
||
```dockerfile
|
||
FROM eclipse-temurin:17-jre-alpine
|
||
WORKDIR /app
|
||
COPY target/label-backend-*.jar app.jar
|
||
EXPOSE 8080
|
||
ENTRYPOINT ["java", "-jar", "app.jar"]
|
||
```
|
||
|
||
[↑ 返回目录](#目录)
|
||
|
||
---
|
||
|
||
## 七、测试策略
|
||
|
||
### 7.1 基本原则
|
||
|
||
- 集成测试**必须**使用真实的 PostgreSQL 和 Redis 实例(Testcontainers),禁止仅 Mock 数据库
|
||
- 每个受保护接口组**必须**有至少一个集成测试覆盖 Shiro 认证/鉴权过滤器链(缺少 Token → 401,角色不足 → 403)
|
||
|
||
### 7.2 并发任务领取测试(必须)
|
||
|
||
```java
|
||
@Test
|
||
void concurrentClaimShouldOnlySucceedOnce() throws InterruptedException {
|
||
// 准备:创建一个 UNCLAIMED 任务
|
||
Long taskId = createUnclaimedTask();
|
||
int threadCount = 10;
|
||
CountDownLatch latch = new CountDownLatch(1);
|
||
AtomicInteger successCount = new AtomicInteger(0);
|
||
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
|
||
|
||
for (int i = 0; i < threadCount; i++) {
|
||
pool.submit(() -> {
|
||
try {
|
||
latch.await();
|
||
taskClaimService.claim(taskId);
|
||
successCount.incrementAndGet();
|
||
} catch (BusinessException e) {
|
||
// 预期:其余线程抛出"任务已被领取"
|
||
} catch (InterruptedException e) {
|
||
Thread.currentThread().interrupt();
|
||
}
|
||
});
|
||
}
|
||
latch.countDown();
|
||
pool.awaitTermination(5, SECONDS);
|
||
|
||
// 验证:仅 1 个线程领取成功
|
||
assertThat(successCount.get()).isEqualTo(1);
|
||
AnnotationTask task = taskMapper.selectById(taskId);
|
||
assertThat(task.getStatus()).isEqualTo("IN_PROGRESS");
|
||
assertThat(task.getClaimedBy()).isNotNull();
|
||
}
|
||
```
|
||
|
||
### 7.3 视频回调幂等测试(必须)
|
||
|
||
```java
|
||
@Test
|
||
void duplicateSuccessCallbackShouldBeIdempotent() {
|
||
Long sourceId = createVideoSource();
|
||
VideoProcessJob job = videoProcessService.createJob(sourceId, "FRAME_EXTRACT", params);
|
||
|
||
// 第一次成功回调
|
||
videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);
|
||
// 重复成功回调
|
||
videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);
|
||
|
||
// 验证:只创建一个 annotation_task
|
||
long taskCount = annotationTaskMapper.countBySourceId(sourceId);
|
||
assertThat(taskCount).isEqualTo(1);
|
||
}
|
||
```
|
||
|
||
### 7.4 状态机越界拒绝测试
|
||
|
||
```java
|
||
@Test
|
||
void illegalStateTransitionShouldThrow() {
|
||
// 验证:APPROVED → IN_PROGRESS 被拒绝
|
||
assertThatThrownBy(() ->
|
||
StateValidator.assertTransition(TaskStatus.APPROVED, TaskStatus.IN_PROGRESS, TaskStatus.TRANSITIONS)
|
||
).isInstanceOf(BusinessException.class)
|
||
.hasMessageContaining("非法状态转换");
|
||
}
|
||
```
|
||
|
||
### 7.5 多租户隔离测试
|
||
|
||
```java
|
||
@Test
|
||
void companyACannotAccessCompanyBData() {
|
||
Long sourceIdB = createSourceForCompany(companyB);
|
||
// 以 companyA 身份请求 companyB 的资源
|
||
CompanyContext.set(companyA);
|
||
assertThatThrownBy(() -> sourceService.findById(sourceIdB))
|
||
.isInstanceOf(BusinessException.class);
|
||
}
|
||
```
|
||
|
||
[↑ 返回目录](#目录)
|
||
|
||
---
|
||
|
||
## 八、宪章合规检查清单
|
||
|
||
PR 合并前评审人**必须**逐条核对以下清单:
|
||
|
||
| # | 宪章原则 | 实现位置 | 检查要点 |
|
||
|---|----------|----------|----------|
|
||
| 1 | 环境约束(JDK 17、SB 3、Shiro、MyBatis Plus) | `pom.xml` | 版本号符合约束;无 Spring Security 并行引入 |
|
||
| 2 | 多租户数据隔离 | `TenantLineInnerInterceptor`、`CompanyContext` | 所有 Mapper 自动注入 company_id;ThreadLocal 在 finally 块清理 |
|
||
| 3 | BCrypt 密码、UUID T//oken、滑动过期、禁 JWT | `AuthService`、`TokenFilter`、`RedisKeyManager` | 无明文密码存储;每次有效请求重置 TTL;无 JWT 库引入 |
|
||
| 4 | 分级 RBAC、权限注解、角色变更驱逐缓存 | `UserRealm`、`@RequiresRoles`、`UserService` | 无 if-role 临时判断;`updateRole()` 立即删缓存 |
|
||
| 5 | 双流水线、级联触发 QA 任务、parent_source_id 溯源 | `ExtractionService.approve()` | 级联动作在同一 @Transactional 内;视频转文本的 parent_source_id 不为空 |
|
||
| 6 | 状态机完整性 | `StateValidator`、各 `*Status` 枚举 | 所有状态变更调用 `StateValidator`;无绕过直接写 Mapper 的路径 |
|
||
| 7 | 任务争抢双重保障 | `TaskClaimService.claim()` | Redis SET NX + DB WHERE status='UNCLAIMED' 两道并存 |
|
||
| 8 | 异步任务幂等、重试上限、FAILED 需手动重置 | `VideoProcessService.handleCallback()` | 重复成功回调静默忽略;`retry_count >= max_retries` 置 FAILED |
|
||
| 9 | 只追加审计日志、AOP 切面、审计失败不回滚业务 | `AuditAspect`、`@OperationLog` | `sys_operation_log` 无 UPDATE/DELETE;审计异常仅 error 日志 |
|
||
| 10 | RESTful URL、统一响应格式、分页必须 | `Result<T>`、各 Controller | 无动词路径;无裸 POJO 返回;所有列表接口有分页参数 |
|
||
| 11 | YAGNI:业务在 Service,Controller 只处理 HTTP | 包结构 | Controller 无业务判断逻辑;无未使用的抽象层 |
|
||
|
||
[↑ 返回目录](#目录)
|