新增 §9.5 评审,10 项问题(N–W): - N: sys_config 全局唯一约束修复(NULL != NULL 问题,改为两个局部唯一索引) - O: annotation_result 新增 UNIQUE(task_id) - P: training_dataset.export_batch_id 改为 BIGINT FK - Q: 全部枚举字段添加 CHECK 约束(role/status/phase/task_type) - R: annotation_task_history 补充 operator_name 快照字段 - S: annotation_task 新增 (company_id, source_id) 索引 - T: training_dataset 新增 task_id 索引 - U: sys_user 补充 created_by 字段 - V: source_data 补充 mime_type 字段 - W: 新增 set_updated_at() 触发器,覆盖全部有 updated_at 的表 附:DDL 修复补丁(ALTER TABLE + 触发器),可直接在开发库执行
1762 lines
80 KiB
Markdown
1762 lines
80 KiB
Markdown
# label_backend 开发实施指南
|
||
|
||
**版本:1.0.0 | 日期:2026-04-09 | 依据:backend后台设计.md v2.2 + constitution.md v1.1.0**
|
||
|
||
---
|
||
|
||
## 一、项目总览
|
||
|
||
### 1.1 系统定位
|
||
|
||
label_backend 是知识图谱智能标注平台的后端服务,基于 Spring Boot 3 构建,驱动**文本线**和**图片线**两条标注流水线,将文档、图片、视频原始资料处理为 GLM 微调格式的训练数据集。
|
||
|
||
### 1.2 数据流水线
|
||
|
||
```
|
||
文本线: 文档 → 三元组提取(主语/谓语/宾语 + 原文片段 + 字符偏移)
|
||
→ 问答对生成 → 审批 → 训练样本(GLM 文本格式)
|
||
|
||
图片线: 图片 → 四元组提取(主体/关系/客体/修饰词 + bbox + 裁剪图)
|
||
→ 问答对生成 → 审批 → 训练样本(GLM 图文格式)
|
||
|
||
视频预处理(非独立流水线):
|
||
帧模式: 视频 → AI 抽帧 → 每帧作为图片进入图片线
|
||
片段模式: 视频 → 多模态模型转文本 → 派生 TEXT source_data → 进入文本线
|
||
```
|
||
|
||
### 1.3 技术栈矩阵
|
||
|
||
| 层次 | 技术 | 版本约束 |
|
||
|------|------|----------|
|
||
| 运行时 | JDK | 17(LTS) |
|
||
| 框架 | Spring Boot | ≥ 3.0.x |
|
||
| 认证/鉴权 | Apache Shiro | ≥ 1.13.x(兼容 Spring Boot 3) |
|
||
| ORM | MyBatis Plus | ≥ 3.5.x |
|
||
| 数据库 | PostgreSQL | ≥ 14 |
|
||
| 缓存/锁 | Redis | ≥ 6.x |
|
||
| 对象存储 | RustFS(S3 兼容接口) | 当前稳定版 |
|
||
| AI 服务 | Python FastAPI(HTTP 调用) | JVM 侧仅作 RestClient 调用 |
|
||
| 容器化 | Docker Compose | ≥ 2.x |
|
||
| 构建工具 | Maven | ≥ 3.8 |
|
||
|
||
### 1.4 后端模块清单
|
||
|
||
| 模块 | 职责 |
|
||
|------|------|
|
||
| 用户与权限模块 | 用户管理、Shiro RBAC、Token 认证 |
|
||
| 资料管理模块 | 文件上传至 RustFS、source_data 元数据管理 |
|
||
| 任务池模块 | 任务创建、领取(分布式锁 + 乐观锁)、状态流转 |
|
||
| 标注工作台模块 | 调用 AI 提取三/四元组、annotation_result JSONB 写入 |
|
||
| 问答生成模块 | 调用 AI 生成候选问答对、training_dataset 管理 |
|
||
| 训练数据导出模块 | JSONL 批次导出、GLM 微调对接 |
|
||
| 系统配置模块 | Prompt 模板、模型参数、全局/公司级配置管理 |
|
||
| 视频处理模块 | 异步抽帧 / 转文本任务跟踪、幂等回调处理 |
|
||
|
||
### 1.5 包结构
|
||
|
||
```
|
||
com.label
|
||
├── LabelBackendApplication.java
|
||
├── common/
|
||
│ ├── result/ # Result<T>、ResultCode、PageResult<T>
|
||
│ ├── exception/ # BusinessException、GlobalExceptionHandler
|
||
│ ├── context/ # CompanyContext(ThreadLocal)
|
||
│ ├── shiro/ # TokenFilter、UserRealm、ShiroConfig
|
||
│ ├── redis/ # RedisKeyManager、RedisService
|
||
│ ├── aop/ # AuditAspect、@OperationLog 注解
|
||
│ ├── storage/ # RustFsClient(S3 兼容封装)
|
||
│ ├── ai/ # AiServiceClient(RestClient 封装 8 个端点)
|
||
│ └── statemachine/ # StateValidator、各状态枚举
|
||
├── module/
|
||
│ ├── user/
|
||
│ │ ├── controller/ # AuthController、UserController
|
||
│ │ ├── service/ # AuthService、UserService
|
||
│ │ ├── mapper/ # SysUserMapper、SysCompanyMapper
|
||
│ │ ├── entity/ # SysUser、SysCompany
|
||
│ │ └── dto/ # LoginRequest、UserDTO、UserCreateRequest
|
||
│ ├── source/
|
||
│ │ ├── controller/ # SourceController
|
||
│ │ ├── service/ # SourceService
|
||
│ │ ├── mapper/ # SourceDataMapper
|
||
│ │ ├── entity/ # SourceData
|
||
│ │ └── dto/ # SourceUploadResponse、SourceListQuery
|
||
│ ├── task/
|
||
│ │ ├── controller/ # TaskController
|
||
│ │ ├── service/ # TaskService、TaskClaimService
|
||
│ │ ├── mapper/ # AnnotationTaskMapper、TaskHistoryMapper
|
||
│ │ ├── entity/ # AnnotationTask、AnnotationTaskHistory
|
||
│ │ └── dto/ # TaskCreateRequest、TaskPoolQuery、TaskDetailDTO
|
||
│ ├── annotation/
|
||
│ │ ├── controller/ # ExtractionController、QaController
|
||
│ │ ├── service/ # ExtractionService、QaService
|
||
│ │ ├── mapper/ # AnnotationResultMapper、TrainingDatasetMapper
|
||
│ │ ├── entity/ # AnnotationResult、TrainingDataset
|
||
│ │ └── dto/ # ExtractionResultDTO、QaItemDTO、RejectRequest
|
||
│ ├── export/
|
||
│ │ ├── controller/ # ExportController
|
||
│ │ ├── service/ # ExportService、FinetuneService
|
||
│ │ ├── mapper/ # ExportBatchMapper
|
||
│ │ ├── entity/ # ExportBatch
|
||
│ │ └── dto/ # ExportBatchRequest、FinetuneRequest
|
||
│ ├── config/
|
||
│ │ ├── controller/ # SysConfigController
|
||
│ │ ├── service/ # SysConfigService
|
||
│ │ ├── mapper/ # SysConfigMapper
|
||
│ │ ├── entity/ # SysConfig
|
||
│ │ └── dto/ # ConfigUpdateRequest
|
||
│ └── video/
|
||
│ ├── controller/ # VideoController
|
||
│ ├── service/ # VideoProcessService
|
||
│ ├── mapper/ # VideoProcessJobMapper
|
||
│ ├── entity/ # VideoProcessJob
|
||
│ └── dto/ # VideoProcessRequest、VideoCallbackRequest
|
||
```
|
||
|
||
---
|
||
|
||
## 二、数据库 DDL
|
||
|
||
> 执行顺序:sys_company → sys_user → source_data → annotation_task → annotation_result → training_dataset → export_batch → sys_config → sys_operation_log → annotation_task_history → video_process_job
|
||
|
||
```sql
|
||
-- =============================================
|
||
-- 1. sys_company — 公司表(多租户根节点)
|
||
-- =============================================
|
||
CREATE TABLE sys_company (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_name VARCHAR(100) NOT NULL UNIQUE,
|
||
company_code VARCHAR(50) NOT NULL UNIQUE,
|
||
status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE', -- ACTIVE / DISABLED
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
|
||
-- =============================================
|
||
-- 2. sys_user — 用户表
|
||
-- =============================================
|
||
CREATE TABLE sys_user (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
username VARCHAR(50) NOT NULL,
|
||
password_hash VARCHAR(255) NOT NULL, -- BCrypt,强度因子 >= 10
|
||
real_name VARCHAR(50),
|
||
role VARCHAR(20) NOT NULL, -- UPLOADER / ANNOTATOR / REVIEWER / ADMIN
|
||
status VARCHAR(10) NOT NULL DEFAULT 'ACTIVE',
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
CONSTRAINT uq_company_username UNIQUE (company_id, username)
|
||
);
|
||
CREATE INDEX idx_sys_user_company ON sys_user(company_id);
|
||
|
||
-- =============================================
|
||
-- 3. source_data — 原始资料元数据表
|
||
-- =============================================
|
||
CREATE TABLE source_data (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
uploader_id BIGINT REFERENCES sys_user(id),
|
||
data_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO
|
||
file_path VARCHAR(500) NOT NULL,
|
||
file_name VARCHAR(255) NOT NULL,
|
||
file_size BIGINT,
|
||
bucket_name VARCHAR(100) NOT NULL,
|
||
parent_source_id BIGINT REFERENCES source_data(id), -- 视频转文本时指向原视频
|
||
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
|
||
-- PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED / REJECTED
|
||
reject_reason TEXT,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_source_data_company ON source_data(company_id);
|
||
CREATE INDEX idx_source_data_status ON source_data(company_id, status);
|
||
CREATE INDEX idx_source_data_parent ON source_data(parent_source_id);
|
||
|
||
-- =============================================
|
||
-- 4. annotation_task — 标注任务表
|
||
-- =============================================
|
||
CREATE TABLE annotation_task (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
source_id BIGINT NOT NULL REFERENCES source_data(id),
|
||
phase VARCHAR(20) NOT NULL, -- EXTRACTION / QA_GENERATION
|
||
task_type VARCHAR(20) NOT NULL, -- AI_ASSISTED / MANUAL
|
||
ai_model VARCHAR(50),
|
||
video_unit_type VARCHAR(20), -- FRAME(仅视频帧模式填写,其余为空)
|
||
video_unit_info JSONB, -- {"frame_index":150,"time_sec":5.0,"frame_path":"..."}
|
||
claimed_by BIGINT REFERENCES sys_user(id),
|
||
claimed_at TIMESTAMP,
|
||
status VARCHAR(20) NOT NULL DEFAULT 'UNCLAIMED',
|
||
-- UNCLAIMED / IN_PROGRESS / SUBMITTED / APPROVED / REJECTED
|
||
reject_reason TEXT,
|
||
submitted_at TIMESTAMP,
|
||
completed_at TIMESTAMP,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_annotation_task_company ON annotation_task(company_id);
|
||
CREATE INDEX idx_annotation_task_pool ON annotation_task(company_id, phase, status);
|
||
CREATE INDEX idx_annotation_task_claimed ON annotation_task(claimed_by, status);
|
||
|
||
-- =============================================
|
||
-- 5. annotation_result — 标注结果表(EXTRACTION 阶段,JSONB 聚合)
|
||
-- =============================================
|
||
CREATE TABLE annotation_result (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
|
||
result_json JSONB NOT NULL, -- 整体覆盖,禁止局部 PATCH
|
||
is_final BOOLEAN NOT NULL DEFAULT FALSE,
|
||
submitted_by BIGINT REFERENCES sys_user(id),
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_annotation_result_task ON annotation_result(task_id);
|
||
CREATE INDEX idx_annotation_result_final ON annotation_result(company_id, is_final);
|
||
|
||
-- =============================================
|
||
-- 6. training_dataset — 训练样本表(问答对终态)
|
||
-- =============================================
|
||
CREATE TABLE training_dataset (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
|
||
source_id BIGINT NOT NULL REFERENCES source_data(id),
|
||
extraction_result_id BIGINT NOT NULL REFERENCES annotation_result(id),
|
||
sample_type VARCHAR(20) NOT NULL, -- TEXT / IMAGE / VIDEO_FRAME
|
||
glm_format_json JSONB NOT NULL, -- GLM 微调格式,JSONB 支持字段级查询
|
||
export_batch_id VARCHAR(50), -- 未导出时为空
|
||
status VARCHAR(20) NOT NULL DEFAULT 'PENDING_REVIEW',
|
||
-- PENDING_REVIEW / APPROVED / REJECTED
|
||
reject_reason TEXT,
|
||
reviewed_by BIGINT REFERENCES sys_user(id),
|
||
exported_at TIMESTAMP,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_training_dataset_company ON training_dataset(company_id);
|
||
CREATE INDEX idx_training_dataset_status ON training_dataset(company_id, status);
|
||
CREATE INDEX idx_training_dataset_batch ON training_dataset(export_batch_id);
|
||
|
||
-- =============================================
|
||
-- 7. export_batch — 导出批次表
|
||
-- =============================================
|
||
CREATE TABLE export_batch (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
batch_uuid VARCHAR(50) NOT NULL UNIQUE,
|
||
dataset_file_path VARCHAR(500),
|
||
sample_count INT NOT NULL DEFAULT 0,
|
||
glm_job_id VARCHAR(100), -- 调用微调接口后填写,初始为 NULL
|
||
finetune_status VARCHAR(20) NOT NULL DEFAULT 'NOT_STARTED',
|
||
-- NOT_STARTED / RUNNING / SUCCESS / FAILED
|
||
error_message TEXT,
|
||
created_by BIGINT REFERENCES sys_user(id),
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_export_batch_company ON export_batch(company_id);
|
||
|
||
-- =============================================
|
||
-- 8. sys_config — 系统配置表(支持全局默认 + 公司级覆盖)
|
||
-- =============================================
|
||
CREATE TABLE sys_config (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT REFERENCES sys_company(id), -- NULL = 全局默认配置
|
||
config_key VARCHAR(100) NOT NULL,
|
||
config_value TEXT NOT NULL,
|
||
description TEXT,
|
||
updated_by BIGINT REFERENCES sys_user(id),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
CONSTRAINT uq_config_company_key UNIQUE (company_id, config_key)
|
||
);
|
||
|
||
-- 预置全局配置项
|
||
INSERT INTO sys_config (company_id, config_key, config_value, description) VALUES
|
||
(NULL, 'prompt_extract_text', '...', '文本三元组提取 Prompt 模板'),
|
||
(NULL, 'prompt_extract_image', '...', '图像四元组提取 Prompt 模板'),
|
||
(NULL, 'prompt_video_to_text', '...', '视频转文本 Prompt 模板'),
|
||
(NULL, 'prompt_qa_gen_text', '...', '文本问答对生成 Prompt 模板'),
|
||
(NULL, 'prompt_qa_gen_image', '...', '图像问答对生成 Prompt 模板'),
|
||
(NULL, 'model_default', 'glm-4', '默认 AI 辅助模型'),
|
||
(NULL, 'video_frame_interval', '30', '视频帧模式抽帧间隔(帧数)'),
|
||
(NULL, 'token_ttl_seconds', '7200', 'Token 有效期(秒)'),
|
||
(NULL, 'glm_api_base_url', 'http://ai-service:8000', 'GLM API 地址');
|
||
|
||
-- =============================================
|
||
-- 9. sys_operation_log — 操作审计日志(只追加,按月分区)
|
||
-- =============================================
|
||
CREATE TABLE sys_operation_log (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT REFERENCES sys_company(id),
|
||
operator_id BIGINT REFERENCES sys_user(id), -- 登录失败时可为 NULL
|
||
operator_name VARCHAR(50) NOT NULL, -- 操作时用户名快照
|
||
operation_type VARCHAR(50) NOT NULL,
|
||
target_type VARCHAR(30),
|
||
target_id BIGINT,
|
||
detail JSONB,
|
||
ip_address VARCHAR(50),
|
||
result VARCHAR(10) NOT NULL, -- SUCCESS / FAIL
|
||
error_message TEXT,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
) PARTITION BY RANGE (created_at);
|
||
|
||
-- 初始分区(建议使用 pg_partman 自动维护)
|
||
CREATE TABLE sys_operation_log_2026_04
|
||
PARTITION OF sys_operation_log
|
||
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
|
||
|
||
CREATE TABLE sys_operation_log_2026_05
|
||
PARTITION OF sys_operation_log
|
||
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
|
||
|
||
-- =============================================
|
||
-- 10. annotation_task_history — 任务流转历史(只追加)
|
||
-- =============================================
|
||
CREATE TABLE annotation_task_history (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
task_id BIGINT NOT NULL REFERENCES annotation_task(id),
|
||
from_status VARCHAR(20), -- 任务初建时为 NULL
|
||
to_status VARCHAR(20) NOT NULL,
|
||
operator_id BIGINT NOT NULL REFERENCES sys_user(id),
|
||
operator_role VARCHAR(20) NOT NULL, -- 操作时角色快照
|
||
note TEXT, -- 驳回原因、强制转移说明等
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_task_history_task ON annotation_task_history(task_id);
|
||
|
||
-- =============================================
|
||
-- 11. video_process_job — 视频异步处理任务表
|
||
-- =============================================
|
||
CREATE TABLE video_process_job (
|
||
id BIGSERIAL PRIMARY KEY,
|
||
company_id BIGINT NOT NULL REFERENCES sys_company(id),
|
||
source_id BIGINT NOT NULL REFERENCES source_data(id),
|
||
job_type VARCHAR(20) NOT NULL, -- FRAME_EXTRACT / VIDEO_TO_TEXT
|
||
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
|
||
-- PENDING / RUNNING / SUCCESS / FAILED / RETRYING
|
||
params JSONB NOT NULL,
|
||
total_units INT,
|
||
processed_units INT NOT NULL DEFAULT 0,
|
||
output_path VARCHAR(500),
|
||
retry_count INT NOT NULL DEFAULT 0,
|
||
max_retries INT NOT NULL DEFAULT 3,
|
||
error_message TEXT,
|
||
started_at TIMESTAMP,
|
||
completed_at TIMESTAMP,
|
||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
|
||
);
|
||
CREATE INDEX idx_video_process_job_source ON video_process_job(source_id);
|
||
CREATE INDEX idx_video_process_job_status ON video_process_job(status);
|
||
```
|
||
|
||
---
|
||
|
||
## 三、公共基础设施
|
||
|
||
### 3.1 统一响应封装
|
||
|
||
```java
|
||
// com/label/common/result/Result.java
|
||
public record Result<T>(String code, T data, String message) {
|
||
public static <T> Result<T> ok(T data) {
|
||
return new Result<>("SUCCESS", data, null);
|
||
}
|
||
public static Result<Void> ok() {
|
||
return new Result<>("SUCCESS", null, null);
|
||
}
|
||
public static <T> Result<T> fail(String code, String message) {
|
||
return new Result<>(code, null, message);
|
||
}
|
||
}
|
||
|
||
// com/label/common/result/PageResult.java
|
||
public record PageResult<T>(List<T> items, long total, int page, int pageSize) {}
|
||
```
|
||
|
||
所有 Controller 返回 `Result<T>`,禁止直接返回裸 POJO 或裸 List。分页接口返回 `Result<PageResult<T>>`。
|
||
|
||
### 3.2 全局异常处理
|
||
|
||
```java
|
||
// com/label/common/exception/GlobalExceptionHandler.java
|
||
@RestControllerAdvice
|
||
public class GlobalExceptionHandler {
|
||
|
||
@ExceptionHandler(BusinessException.class)
|
||
public ResponseEntity<Result<?>> handleBusiness(BusinessException ex) {
|
||
return ResponseEntity.status(ex.getHttpStatus())
|
||
.body(Result.fail(ex.getCode(), ex.getMessage()));
|
||
}
|
||
|
||
@ExceptionHandler(UnauthorizedException.class)
|
||
public ResponseEntity<Result<?>> handleUnauthorized() {
|
||
return ResponseEntity.status(403)
|
||
.body(Result.fail("FORBIDDEN", "权限不足"));
|
||
}
|
||
|
||
@ExceptionHandler(Exception.class)
|
||
public ResponseEntity<Result<?>> handleUnexpected(Exception ex, HttpServletRequest req) {
|
||
log.error("Unexpected error on {}", req.getRequestURI(), ex);
|
||
// 生产响应禁止包含堆栈跟踪
|
||
return ResponseEntity.status(500)
|
||
.body(Result.fail("INTERNAL_ERROR", "服务器内部错误"));
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.3 多租户 CompanyContext(ThreadLocal)
|
||
|
||
```java
|
||
// com/label/common/context/CompanyContext.java
|
||
public final class CompanyContext {
|
||
private static final ThreadLocal<Long> COMPANY_ID = new ThreadLocal<>();
|
||
|
||
public static void set(Long companyId) { COMPANY_ID.set(companyId); }
|
||
public static Long get() { return COMPANY_ID.get(); }
|
||
public static void clear() { COMPANY_ID.remove(); } // 必须在 finally 块调用
|
||
}
|
||
```
|
||
|
||
在 Shiro `TokenFilter` 的 `executeLogin` 成功后注入;在过滤器的 `finally` 块调用 `CompanyContext.clear()`,防止线程池复用时数据串漏至其他租户。
|
||
|
||
**禁止**调用方通过请求体传入 `company_id` 作为参数。所有 Service 通过 `CompanyContext.get()` 获取。
|
||
|
||
MyBatis Plus 配置 `TenantLineInnerInterceptor` 自动向所有 Mapper 查询注入 `company_id = ?` 条件:
|
||
|
||
```java
|
||
@Bean
|
||
public MybatisPlusInterceptor mybatisPlusInterceptor() {
|
||
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
|
||
interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(
|
||
new TenantLineHandler() {
|
||
public Expression getTenantId() {
|
||
return new LongValue(CompanyContext.get());
|
||
}
|
||
public String getTenantIdColumn() { return "company_id"; }
|
||
}
|
||
));
|
||
return interceptor;
|
||
}
|
||
```
|
||
|
||
### 3.4 Shiro 配置
|
||
|
||
#### TokenFilter(继承 AuthenticatingFilter)
|
||
|
||
```java
|
||
// com/label/common/shiro/TokenFilter.java
|
||
public class TokenFilter extends AuthenticatingFilter {
|
||
|
||
@Override
|
||
protected AuthenticationToken createToken(ServletRequest req, ServletResponse res) {
|
||
String token = extractToken((HttpServletRequest) req);
|
||
return new TokenAuthenticationToken(token);
|
||
}
|
||
|
||
@Override
|
||
protected boolean onAccessDenied(ServletRequest req, ServletResponse res) throws Exception {
|
||
String token = extractToken((HttpServletRequest) req);
|
||
if (token == null) {
|
||
sendUnauthorized(res, "缺少 Token");
|
||
return false;
|
||
}
|
||
return executeLogin(req, res);
|
||
}
|
||
|
||
@Override
|
||
protected boolean onLoginSuccess(AuthenticationToken token, Subject subject,
|
||
ServletRequest req, ServletResponse res) throws Exception {
|
||
UserSession session = (UserSession) subject.getPrincipal();
|
||
// 滑动过期:每次有效请求重置 TTL
|
||
long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
|
||
redisTemplate.expire(RedisKeyManager.tokenKey(session.getToken()), ttl, SECONDS);
|
||
// 注入租户上下文(在整个 Filter 链执行期间有效,由 doFilterInternal 的 finally 清理)
|
||
CompanyContext.set(session.getCompanyId());
|
||
return true; // 继续执行 Filter 链(进入 Controller)
|
||
}
|
||
|
||
// ⚠️ 必须在此处清理 ThreadLocal,而非 onLoginSuccess——
|
||
// onLoginSuccess 返回 true 后 Filter 链才继续执行,此时 Controller 尚未运行。
|
||
// doFilterInternal 的 finally 块保证请求结束后(无论正常还是异常)都清理上下文。
|
||
@Override
|
||
protected void doFilterInternal(ServletRequest req, ServletResponse res, FilterChain chain)
|
||
throws ServletException, IOException {
|
||
try {
|
||
super.doFilterInternal(req, res, chain);
|
||
} finally {
|
||
CompanyContext.clear();
|
||
}
|
||
}
|
||
|
||
private String extractToken(HttpServletRequest req) {
|
||
String header = req.getHeader("Authorization");
|
||
if (header != null && header.startsWith("Bearer ")) {
|
||
return header.substring(7);
|
||
}
|
||
return null;
|
||
}
|
||
}
|
||
```
|
||
|
||
#### UserRealm(继承 AuthorizingRealm)
|
||
|
||
```java
|
||
// com/label/common/shiro/UserRealm.java
|
||
public class UserRealm extends AuthorizingRealm {
|
||
|
||
// 认证:从 Redis 验证 token:{uuid} 是否存在
|
||
@Override
|
||
protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) {
|
||
String uuid = ((TokenAuthenticationToken) token).getToken();
|
||
Map<Object, Object> data = redisTemplate.opsForHash()
|
||
.entries(RedisKeyManager.tokenKey(uuid));
|
||
if (data.isEmpty()) throw new UnknownAccountException("Token 无效或已过期");
|
||
UserSession session = UserSession.from(uuid, data);
|
||
return new SimpleAuthenticationInfo(session, uuid, getName());
|
||
}
|
||
|
||
// 鉴权:先查 Redis user:perm:{userId},未命中查 PostgreSQL
|
||
@Override
|
||
protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
|
||
UserSession session = (UserSession) principals.getPrimaryPrincipal();
|
||
String permKey = RedisKeyManager.userPermKey(session.getUserId());
|
||
String cachedRole = (String) redisTemplate.opsForValue().get(permKey);
|
||
if (cachedRole == null) {
|
||
SysUser user = sysUserMapper.selectById(session.getUserId());
|
||
cachedRole = user.getRole();
|
||
redisTemplate.opsForValue().set(permKey, cachedRole, 5, MINUTES);
|
||
}
|
||
SimpleAuthorizationInfo info = new SimpleAuthorizationInfo();
|
||
info.addRole(cachedRole);
|
||
// 角色继承:高级角色包含所有低级角色权限
|
||
addInheritedRoles(info, cachedRole);
|
||
return info;
|
||
}
|
||
|
||
private void addInheritedRoles(SimpleAuthorizationInfo info, String role) {
|
||
// ADMIN ⊃ REVIEWER ⊃ ANNOTATOR ⊃ UPLOADER
|
||
switch (role) {
|
||
case "ADMIN" -> info.addRoles(Set.of("REVIEWER", "ANNOTATOR", "UPLOADER"));
|
||
case "REVIEWER" -> info.addRoles(Set.of("ANNOTATOR", "UPLOADER"));
|
||
case "ANNOTATOR" -> info.addRoles(Set.of("UPLOADER"));
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
#### ShiroConfig 过滤器链
|
||
|
||
```java
|
||
@Bean
|
||
public ShiroFilterChainDefinition shiroFilterChainDefinition() {
|
||
DefaultShiroFilterChainDefinition chain = new DefaultShiroFilterChainDefinition();
|
||
chain.addPathDefinition("/api/auth/login", "anon");
|
||
chain.addPathDefinition("/api/**", "tokenFilter");
|
||
return chain;
|
||
}
|
||
```
|
||
|
||
权限声明使用注解,禁止在 Service 内使用 if-role 临时判断:
|
||
|
||
```java
|
||
@RequiresRoles("ADMIN")
|
||
public void createTask(...)
|
||
|
||
@RequiresRoles(value = {"ANNOTATOR", "REVIEWER", "ADMIN"}, logical = Logical.OR)
|
||
public void claimTask(...)
|
||
```
|
||
|
||
#### 角色变更立即驱逐缓存
|
||
|
||
```java
|
||
// UserService.updateRole() 内
|
||
@Transactional
|
||
public void updateRole(Long userId, String newRole) {
|
||
userMapper.updateRole(userId, newRole);
|
||
// 立即驱逐,不等 TTL 自然过期(延迟在禁用高权限账号时存在安全窗口)
|
||
redisTemplate.delete(RedisKeyManager.userPermKey(userId));
|
||
}
|
||
```
|
||
|
||
### 3.5 Redis Key 管理
|
||
|
||
```java
|
||
// com/label/common/redis/RedisKeyManager.java
|
||
public final class RedisKeyManager {
|
||
public static String tokenKey(String uuid) { return "token:" + uuid; }
|
||
public static String userPermKey(Long userId) { return "user:perm:" + userId; }
|
||
public static String taskClaimKey(Long taskId){ return "task:claim:" + taskId; }
|
||
}
|
||
```
|
||
|
||
禁止在上述三类命名空间之外自造 Key 用于认证、权限或锁目的。
|
||
|
||
### 3.6 AOP 审计日志切面
|
||
|
||
```java
|
||
// com/label/common/aop/OperationLog.java(注解)
|
||
@Target(ElementType.METHOD)
|
||
@Retention(RetentionPolicy.RUNTIME)
|
||
public @interface OperationLog {
|
||
String type(); // 对应 operation_type 枚举值,如 "TASK_CLAIM"
|
||
String targetType() default "";
|
||
}
|
||
|
||
// com/label/common/aop/AuditAspect.java
|
||
@Aspect
|
||
@Component
|
||
public class AuditAspect {
|
||
|
||
@Around("@annotation(operationLog)")
|
||
public Object audit(ProceedingJoinPoint pjp, OperationLog operationLog) throws Throwable {
|
||
String result = "SUCCESS";
|
||
String errorMsg = null;
|
||
Object returnVal = null;
|
||
try {
|
||
returnVal = pjp.proceed();
|
||
} catch (Throwable ex) {
|
||
result = "FAIL";
|
||
errorMsg = ex.getMessage();
|
||
throw ex;
|
||
} finally {
|
||
// 业务事务已 commit/rollback 后,以独立操作写入审计记录
|
||
// 审计写入失败只记录 error 日志,禁止回滚业务事务
|
||
try {
|
||
saveAuditLog(operationLog.type(), operationLog.targetType(), result, errorMsg);
|
||
} catch (Exception auditEx) {
|
||
log.error("审计日志写入失败: type={}", operationLog.type(), auditEx);
|
||
}
|
||
}
|
||
return returnVal;
|
||
}
|
||
|
||
private void saveAuditLog(String type, String targetType, String result, String errorMsg) {
|
||
UserSession session = getCurrentSession();
|
||
SysOperationLog log = SysOperationLog.builder()
|
||
.companyId(CompanyContext.get())
|
||
.operatorId(session != null ? session.getUserId() : null)
|
||
.operatorName(session != null ? session.getUsername() : "unknown")
|
||
.operationType(type)
|
||
.targetType(targetType.isEmpty() ? null : targetType)
|
||
.result(result)
|
||
.errorMessage(errorMsg)
|
||
.ipAddress(getClientIp())
|
||
.build();
|
||
operationLogMapper.insert(log);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.7 RustFS S3 客户端
|
||
|
||
RustFS 实现 S3 兼容接口,使用 AWS SDK for Java v2 连接:
|
||
|
||
```java
|
||
// com/label/common/storage/RustFsClient.java
|
||
@Component
|
||
public class RustFsClient {
|
||
|
||
private final S3Client s3Client; // 配置 endpoint 指向 RustFS 地址
|
||
|
||
public String upload(String bucket, String key, InputStream data, long size) {
|
||
PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).build();
|
||
s3Client.putObject(req, RequestBody.fromInputStream(data, size));
|
||
return key;
|
||
}
|
||
|
||
public InputStream download(String bucket, String key) {
|
||
GetObjectRequest req = GetObjectRequest.builder().bucket(bucket).key(key).build();
|
||
return s3Client.getObject(req);
|
||
}
|
||
|
||
public void delete(String bucket, String key) {
|
||
s3Client.deleteObject(b -> b.bucket(bucket).key(key));
|
||
}
|
||
|
||
public String getPresignedUrl(String bucket, String key, Duration expiry) {
|
||
S3Presigner presigner = S3Presigner.builder().endpointOverride(endpointUri).build();
|
||
return presigner.presignGetObject(b -> b.signatureDuration(expiry)
|
||
.getObjectRequest(r -> r.bucket(bucket).key(key))).url().toString();
|
||
}
|
||
}
|
||
```
|
||
|
||
**对象存储路径规范:**
|
||
|
||
| 资源类型 | 存储桶 | 路径格式 |
|
||
|----------|--------|----------|
|
||
| 上传文本文件 | `source-data` | `text/{yyyyMM}/{source_id}.txt` |
|
||
| 上传图片 | `source-data` | `image/{yyyyMM}/{source_id}.jpg` |
|
||
| 上传视频 | `source-data` | `video/{yyyyMM}/{source_id}.mp4` |
|
||
| 视频帧图 | `source-data` | `frames/{source_id}/{frame_index}.jpg` |
|
||
| 视频片段转译文本 | `source-data` | `video-text/{parent_source_id}/{timestamp}.txt` |
|
||
| bbox 裁剪图 | `source-data` | `crops/{task_id}/{item_index}.jpg` |
|
||
| 导出 JSONL | `finetune-export` | `export/{batchUuid}.jsonl` |
|
||
|
||
### 3.8 AI 服务 HTTP 客户端
|
||
|
||
Spring Boot 3 使用 `RestClient` 封装对 Python FastAPI 服务的同步 HTTP 调用:
|
||
|
||
```java
|
||
// com/label/common/ai/AiServiceClient.java
|
||
@Component
|
||
public class AiServiceClient {
|
||
|
||
private final RestClient restClient;
|
||
|
||
public AiServiceClient(@Value("${ai.service.base-url}") String baseUrl) {
|
||
this.restClient = RestClient.builder().baseUrl(baseUrl).build();
|
||
}
|
||
|
||
// POST /api/v1/text/extract
|
||
public TextExtractResponse extractText(String filePath, String model) {
|
||
return restClient.post().uri("/api/v1/text/extract")
|
||
.body(Map.of("file_path", filePath, "model", model))
|
||
.retrieve().body(TextExtractResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/image/extract
|
||
public ImageExtractResponse extractImage(String imagePath, String model) {
|
||
return restClient.post().uri("/api/v1/image/extract")
|
||
.body(Map.of("image_path", imagePath, "model", model))
|
||
.retrieve().body(ImageExtractResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/video/extract-frames
|
||
public VideoFramesResponse extractFrames(String videoPath, String frameInterval, String mode) {
|
||
return restClient.post().uri("/api/v1/video/extract-frames")
|
||
.body(Map.of("video_path", videoPath, "frame_interval", frameInterval, "mode", mode))
|
||
.retrieve().body(VideoFramesResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/video/to-text
|
||
public VideoToTextResponse videoToText(String videoPath, int startSec, int endSec, String model) {
|
||
return restClient.post().uri("/api/v1/video/to-text")
|
||
.body(Map.of("video_path", videoPath, "start_sec", startSec, "end_sec", endSec, "model", model))
|
||
.retrieve().body(VideoToTextResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/qa/gen-text
|
||
public QaGenResponse genTextQa(List<?> triples, String model, String promptTemplate) {
|
||
return restClient.post().uri("/api/v1/qa/gen-text")
|
||
.body(Map.of("items", triples, "model", model, "prompt_template", promptTemplate))
|
||
.retrieve().body(QaGenResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/qa/gen-image
|
||
public QaGenResponse genImageQa(List<?> quadruples, String model, String promptTemplate) {
|
||
return restClient.post().uri("/api/v1/qa/gen-image")
|
||
.body(Map.of("items", quadruples, "model", model, "prompt_template", promptTemplate))
|
||
.retrieve().body(QaGenResponse.class);
|
||
}
|
||
|
||
// POST /api/v1/finetune/start
|
||
public FinetuneStartResponse startFinetune(String jsonlUrl, String baseModel, Map<String, Object> params) {
|
||
return restClient.post().uri("/api/v1/finetune/start")
|
||
.body(Map.of("jsonl_url", jsonlUrl, "base_model", baseModel, "params", params))
|
||
.retrieve().body(FinetuneStartResponse.class);
|
||
}
|
||
|
||
// GET /api/v1/finetune/status/{jobId}
|
||
public FinetuneStatusResponse getFinetuneStatus(String jobId) {
|
||
return restClient.get().uri("/api/v1/finetune/status/{jobId}", jobId)
|
||
.retrieve().body(FinetuneStatusResponse.class);
|
||
}
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 四、业务模块纵切
|
||
|
||
### 4.1 用户与权限模块
|
||
|
||
**Entity(SysUser)**:字段同 DDL,`passwordHash` 字段加 `@JsonIgnore` 禁止序列化到响应。
|
||
|
||
**AuthService 核心逻辑:**
|
||
|
||
```java
|
||
// 登录
|
||
@OperationLog(type = "USER_LOGIN")
|
||
public String login(Long companyId, String username, String password, String ip) {
|
||
SysUser user = userMapper.selectByCompanyAndUsername(companyId, username);
|
||
if (user == null || !BCrypt.checkpw(password, user.getPasswordHash()))
|
||
throw new BusinessException("USER_NOT_FOUND", "用户名或密码错误");
|
||
if ("DISABLED".equals(user.getStatus()))
|
||
throw new BusinessException("USER_DISABLED", "账号已禁用");
|
||
|
||
String token = UUID.randomUUID().toString();
|
||
Map<String, Object> tokenData = Map.of(
|
||
"userId", user.getId(), "role", user.getRole(), "companyId", user.getCompanyId(),
|
||
"username", user.getUsername()
|
||
);
|
||
long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
|
||
redisTemplate.opsForHash().putAll(RedisKeyManager.tokenKey(token), tokenData);
|
||
redisTemplate.expire(RedisKeyManager.tokenKey(token), ttl, SECONDS);
|
||
return token;
|
||
}
|
||
|
||
// 退出登录
|
||
@OperationLog(type = "USER_LOGOUT")
|
||
public void logout(String token) {
|
||
redisTemplate.delete(RedisKeyManager.tokenKey(token));
|
||
}
|
||
```
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| POST | `/api/auth/login` | 匿名 | 返回 Token(UUID) |
|
||
| POST | `/api/auth/logout` | 已登录 | 删除 Redis Token |
|
||
| GET | `/api/auth/me` | 已登录 | 当前用户信息与角色 |
|
||
| GET | `/api/users` | ADMIN | 分页查询用户列表 |
|
||
| POST | `/api/users` | ADMIN | 创建用户 |
|
||
| PUT | `/api/users/{id}` | ADMIN | 更新用户信息 |
|
||
| PUT | `/api/users/{id}/status` | ADMIN | 启用/禁用账号(同步驱逐权限缓存) |
|
||
| PUT | `/api/users/{id}/role` | ADMIN | 变更角色(同步驱逐权限缓存) |
|
||
|
||
---
|
||
|
||
### 4.2 资料管理模块
|
||
|
||
**SourceService 核心逻辑:**
|
||
|
||
```java
|
||
@OperationLog(type = "SOURCE_UPLOAD")
|
||
@Transactional
|
||
public SourceData upload(MultipartFile file, String dataType) {
|
||
Long companyId = CompanyContext.get();
|
||
// 1. 先创建 source_data 记录获取 ID(用于构造存储路径)
|
||
SourceData sd = SourceData.builder()
|
||
.companyId(companyId).uploaderId(getCurrentUserId())
|
||
.dataType(dataType).fileName(file.getOriginalFilename())
|
||
.fileSize(file.getSize()).bucketName("source-data")
|
||
.status("PENDING").build();
|
||
sourceDataMapper.insert(sd);
|
||
|
||
// 2. 构造路径并上传至 RustFS
|
||
String path = buildPath(dataType, sd.getId());
|
||
rustFsClient.upload("source-data", path, file.getInputStream(), file.getSize());
|
||
|
||
// 3. 更新 file_path
|
||
sd.setFilePath(path);
|
||
sourceDataMapper.updateById(sd);
|
||
return sd;
|
||
}
|
||
|
||
private String buildPath(String dataType, Long sourceId) {
|
||
String ym = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
|
||
return switch (dataType) {
|
||
case "TEXT" -> "text/" + ym + "/" + sourceId + ".txt";
|
||
case "IMAGE" -> "image/" + ym + "/" + sourceId + ".jpg";
|
||
case "VIDEO" -> "video/" + ym + "/" + sourceId + ".mp4";
|
||
default -> throw new BusinessException("INVALID_TYPE", "不支持的资料类型");
|
||
};
|
||
}
|
||
```
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| POST | `/api/source/upload` | UPLOADER | 上传文件,创建 source_data 记录 |
|
||
| GET | `/api/source/list` | UPLOADER | 分页查询(UPLOADER 只见自己,ADMIN 见全部) |
|
||
| GET | `/api/source/{id}` | UPLOADER | 查看资料详情 |
|
||
| DELETE | `/api/source/{id}` | ADMIN | 删除资料及 RustFS 文件 |
|
||
|
||
---
|
||
|
||
### 4.3 任务管理模块(含并发控制)
|
||
|
||
**TaskClaimService.claim() — 双重保障:**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "TASK_CLAIM")
|
||
public void claim(Long taskId) {
|
||
Long userId = getCurrentUserId();
|
||
// 第一重:Redis 分布式锁(TTL 30s,SET NX)
|
||
Boolean locked = redisTemplate.opsForValue()
|
||
.setIfAbsent(RedisKeyManager.taskClaimKey(taskId), userId.toString(), 30, SECONDS);
|
||
if (!Boolean.TRUE.equals(locked))
|
||
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");
|
||
|
||
// 第二重:数据库乐观约束(WHERE status = 'UNCLAIMED')
|
||
// UPDATE annotation_task SET status='IN_PROGRESS', claimed_by=?, claimed_at=NOW()
|
||
// WHERE id=? AND status='UNCLAIMED' AND company_id=?
|
||
int affected = taskMapper.claimTask(taskId, userId, CompanyContext.get());
|
||
if (affected == 0)
|
||
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");
|
||
|
||
// 写入任务历史(同一事务)
|
||
insertHistory(taskId, "UNCLAIMED", "IN_PROGRESS", userId, null);
|
||
}
|
||
|
||
public void unclaim(Long taskId) {
|
||
StateValidator.assertTransition(TaskStatus.IN_PROGRESS, TaskStatus.UNCLAIMED, TaskStatus.TRANSITIONS);
|
||
// 更新 claimed_by = NULL, status = UNCLAIMED
|
||
taskMapper.unclaim(taskId, getCurrentUserId(), CompanyContext.get());
|
||
redisTemplate.delete(RedisKeyManager.taskClaimKey(taskId));
|
||
insertHistory(taskId, "IN_PROGRESS", "UNCLAIMED", getCurrentUserId(), null);
|
||
}
|
||
```
|
||
|
||
**每次 status 变更必须在同一事务中调用 `insertHistory()` 写入 `annotation_task_history`。**
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| POST | `/api/tasks` | ADMIN | 为指定 source 创建 EXTRACTION 任务 |
|
||
| GET | `/api/tasks/pool` | ANNOTATOR | 查看可领取任务列表(按角色过滤,分页)。**角色过滤规则**:ANNOTATOR 返回 `phase=EXTRACTION AND status=UNCLAIMED`;REVIEWER 返回 `phase=EXTRACTION AND status=UNCLAIMED` ∪ `phase=QA_GENERATION AND status=UNCLAIMED`(REVIEWER 含 ANNOTATOR 继承权限,可领取两类任务);ADMIN 返回所有 phase、所有 status 的任务(走 `GET /api/tasks`)。 |
|
||
| GET | `/api/tasks/pending-review` | REVIEWER | **审批收件箱**:返回 `status=SUBMITTED` 的任务列表(分页)。REVIEWER 看本公司全部待审批任务;ADMIN 同等权限。ANNOTATOR 无权访问此接口(403)。 |
|
||
| POST | `/api/tasks/{id}/claim` | ANNOTATOR | 领取任务(争抢式) |
|
||
| POST | `/api/tasks/{id}/unclaim` | ANNOTATOR | 放弃任务,退回任务池 |
|
||
| POST | `/api/tasks/{id}/reclaim` | ANNOTATOR | **重拾被驳回的任务**:仅当 `task.status=REJECTED AND task.claimed_by=currentUserId` 时合法;将 status 置为 IN_PROGRESS(状态机 REJECTED → IN_PROGRESS);写入任务历史。 |
|
||
| GET | `/api/tasks/mine` | ANNOTATOR | 查询我领取的任务列表(分页),**包含 REJECTED 状态**(让标注员发现被驳回的任务)。 |
|
||
| GET | `/api/tasks/{id}` | ANNOTATOR | 查看任务详情 |
|
||
| GET | `/api/tasks` | ADMIN | 查询全部任务(支持过滤,分页) |
|
||
| PUT | `/api/tasks/{id}/reassign` | ADMIN | 强制转移任务归属(更新 claimed_by,task status 保持 IN_PROGRESS,向 annotation_task_history 写入 from=IN_PROGRESS / to=IN_PROGRESS + note=转移原因) |
|
||
|
||
---
|
||
|
||
### 4.4 标注工作台模块(EXTRACTION 阶段)
|
||
|
||
**ExtractionService 核心逻辑:**
|
||
|
||
```java
|
||
// AI 辅助预标注(标注员领取任务后调用)
|
||
public AnnotationResult aiPreAnnotate(Long taskId) {
|
||
AnnotationTask task = taskMapper.selectById(taskId);
|
||
SourceData source = sourceDataMapper.selectById(task.getSourceId());
|
||
Object aiResult = "IMAGE".equals(source.getDataType()) || task.getVideoUnitType() != null
|
||
? aiServiceClient.extractImage(resolveImagePath(task), task.getAiModel())
|
||
: aiServiceClient.extractText(source.getFilePath(), task.getAiModel());
|
||
AnnotationResult result = buildAnnotationResult(task, aiResult);
|
||
annotationResultMapper.insertOrReplace(result);
|
||
return result;
|
||
}
|
||
|
||
// 更新提取结果(整体覆盖,PUT 语义)
|
||
public void updateResult(Long taskId, String resultJsonStr) {
|
||
// 验证 JSON 格式合法性
|
||
validateResultJson(resultJsonStr);
|
||
// 整体替换 result_json,禁止局部 PATCH 或逐条追加
|
||
annotationResultMapper.updateResultJson(taskId, resultJsonStr, CompanyContext.get());
|
||
}
|
||
|
||
// 审批通过——两阶段设计:
|
||
// 阶段一(同步 @Transactional):完成审批核心动作(步骤 1-3),立即返回
|
||
// 阶段二(异步事件):AI 调用 + QA 任务创建(步骤 4-7)由 ExtractionApprovedEvent 驱动
|
||
// ⚠️ 禁止将 AI HTTP 调用放入 @Transactional 内:会长期占用 DB 连接,且 AI 失败会回滚已完成的审批动作
|
||
// ⚠️ 自审防护:提交人(claimed_by)不能同时作为审批人
|
||
@Transactional
|
||
@OperationLog(type = "EXTRACTION_APPROVE")
|
||
public void approve(Long taskId) {
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务");
|
||
AnnotationResult result = annotationResultMapper.selectByTaskId(taskId);
|
||
|
||
// 1. annotation_result.is_final = true
|
||
result.setIsFinal(true);
|
||
annotationResultMapper.updateById(result);
|
||
|
||
// 2. annotation_task.status → APPROVED
|
||
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.APPROVED, TaskStatus.TRANSITIONS);
|
||
task.setStatus("APPROVED");
|
||
task.setCompletedAt(LocalDateTime.now());
|
||
taskMapper.updateById(task);
|
||
|
||
// 3. 写入任务历史
|
||
insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);
|
||
|
||
// ---- 事务边界 ----
|
||
// 步骤 4-7 通过 Spring ApplicationEvent 在事务提交后异步执行,
|
||
// 避免 AI HTTP 调用阻塞事务(参见 ExtractionApprovedEventListener)
|
||
applicationEventPublisher.publishEvent(new ExtractionApprovedEvent(taskId, task.getSourceId()));
|
||
}
|
||
|
||
// ExtractionApprovedEventListener(@TransactionalEventListener(phase = AFTER_COMMIT),@Async)
|
||
// 4. 调用 AI 生成候选问答对
|
||
// String promptKey = "IMAGE".equals(getSourceType(task)) ? "prompt_qa_gen_image" : "prompt_qa_gen_text";
|
||
// QaGenResponse qaResponse = generateQa(task, result, promptTemplate);
|
||
//
|
||
// 5. 将候选问答对写入 training_dataset(PENDING_REVIEW)
|
||
//
|
||
// 6. 创建 QA_GENERATION 阶段任务(UNCLAIMED)
|
||
//
|
||
// 7. source_data.status → QA_REVIEW
|
||
```
|
||
|
||
**ExtractionService.reject() 核心逻辑:**
|
||
|
||
```java
|
||
// 驳回——task 回退至 REJECTED,由标注员重新 claim 后进入 IN_PROGRESS
|
||
@Transactional
|
||
@OperationLog(type = "EXTRACTION_REJECT")
|
||
public void reject(Long taskId, String reason) {
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务");
|
||
|
||
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS);
|
||
task.setStatus("REJECTED");
|
||
task.setRejectReason(reason);
|
||
taskMapper.updateById(task);
|
||
insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason);
|
||
// ⚠️ source_data 状态保持 EXTRACTING(不回退),等标注员重新 claim 后继续修改
|
||
}
|
||
```
|
||
|
||
> **source_data EXTRACTING 状态说明**:任务从任务池创建(`ADMIN POST /api/tasks`)时,后端同步将 `source_data.status → EXTRACTING`。EXTRACTION 审批驳回后 source_data 保持 EXTRACTING 不变,标注员重新领取修改后重提,审批通过后再推进至 QA_REVIEW。
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| GET | `/api/extraction/{taskId}` | ANNOTATOR | 获取当前提取结果(含 AI 预标注) |
|
||
| PUT | `/api/extraction/{taskId}` | ANNOTATOR | 更新提取结果(整体 JSONB 覆盖) |
|
||
| POST | `/api/extraction/{taskId}/submit` | ANNOTATOR | 提交提取结果,进入审批队列 |
|
||
| POST | `/api/extraction/{taskId}/approve` | REVIEWER | 审批通过(自审防护:不能审批自己提交的任务),自动触发 QA 任务创建 |
|
||
| POST | `/api/extraction/{taskId}/reject` | REVIEWER | 驳回(自审防护同上),附驳回原因;task → REJECTED,标注员需重新 claim |
|
||
|
||
---
|
||
|
||
### 4.5 问答生成模块(QA_GENERATION 阶段)
|
||
|
||
`QaService` 的整体覆盖逻辑与 `ExtractionService` 一致(PUT 语义,禁止局部 PATCH)。
|
||
|
||
**QaService.reject() 核心逻辑:**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "QA_REJECT")
|
||
public void reject(Long taskId, String reason) {
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能驳回自己提交的任务");
|
||
|
||
// training_dataset.status → REJECTED(标注员修改后重提,状态回 PENDING_REVIEW)
|
||
trainingDatasetMapper.rejectByTaskId(taskId, reason, CompanyContext.get());
|
||
|
||
StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.REJECTED, TaskStatus.TRANSITIONS);
|
||
task.setStatus("REJECTED");
|
||
task.setRejectReason(reason);
|
||
taskMapper.updateById(task);
|
||
insertHistory(taskId, "SUBMITTED", "REJECTED", getCurrentUserId(), reason);
|
||
// source_data 保持 QA_REVIEW,等标注员重新领取修改后重提
|
||
}
|
||
```
|
||
|
||
**approve 级联动作(同一事务):**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "QA_APPROVE")
|
||
public void approve(Long taskId) {
|
||
AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
|
||
if (task.getClaimedBy().equals(getCurrentUserId()))
|
||
throw new BusinessException("SELF_REVIEW_NOT_ALLOWED", "不能审批自己提交的任务");
|
||
|
||
// 1. training_dataset.status → APPROVED
|
||
trainingDatasetMapper.approveByTaskId(taskId, getCurrentUserId(), CompanyContext.get());
|
||
|
||
// 2. annotation_task.status → APPROVED
|
||
task.setStatus("APPROVED");
|
||
task.setCompletedAt(LocalDateTime.now());
|
||
taskMapper.updateById(task);
|
||
|
||
// 3. source_data.status → APPROVED(整条流水线完成)
|
||
sourceDataMapper.updateStatus(task.getSourceId(), "APPROVED", CompanyContext.get());
|
||
|
||
// 4. 写入任务历史
|
||
insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);
|
||
}
|
||
```
|
||
|
||
**接口清单:**
|
||
|
||
| 方法 | 路径 | 最低权限 | 说明 |
|
||
|------|------|----------|------|
|
||
| GET | `/api/qa/{taskId}` | ANNOTATOR | 获取候选问答对列表 |
|
||
| PUT | `/api/qa/{taskId}` | ANNOTATOR | 修改问答对(整体覆盖) |
|
||
| POST | `/api/qa/{taskId}/submit` | ANNOTATOR | 提交问答对,进入审批队列 |
|
||
| POST | `/api/qa/{taskId}/approve` | REVIEWER | 审批通过(自审防护),training_dataset → APPROVED,source_data → APPROVED |
|
||
| POST | `/api/qa/{taskId}/reject` | REVIEWER | 驳回(自审防护),training_dataset → REJECTED;标注员需重新 claim 修改后重提(重提后 training_dataset → PENDING_REVIEW) |
|
||
|
||
---
|
||
|
||
### 4.6 训练数据导出模块
|
||
|
||
**ExportService.createBatch() 核心逻辑:**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "EXPORT_CREATE")
|
||
public ExportBatch createBatch(List<Long> sampleIds) {
|
||
Long companyId = CompanyContext.get();
|
||
// 1. 校验样本均为 APPROVED 状态
|
||
List<TrainingDataset> samples = trainingDatasetMapper
|
||
.selectByIdsAndStatus(sampleIds, "APPROVED", companyId);
|
||
if (samples.size() != sampleIds.size())
|
||
throw new BusinessException("INVALID_SAMPLES", "部分样本不处于 APPROVED 状态");
|
||
|
||
// 2. 生成 JSONL 内容(每行一个 glm_format_json)
|
||
String batchUuid = UUID.randomUUID().toString();
|
||
String jsonl = samples.stream()
|
||
.map(s -> s.getGlmFormatJson().toString())
|
||
.collect(Collectors.joining("\n"));
|
||
|
||
// 3. 上传至 RustFS:finetune-export/export/{batchUuid}.jsonl
|
||
String path = "export/" + batchUuid + ".jsonl";
|
||
byte[] bytes = jsonl.getBytes(StandardCharsets.UTF_8);
|
||
rustFsClient.upload("finetune-export", path, new ByteArrayInputStream(bytes), bytes.length);
|
||
|
||
// 4. 更新样本 export_batch_id 与 exported_at
|
||
trainingDatasetMapper.batchUpdateExportInfo(sampleIds, batchUuid, LocalDateTime.now());
|
||
|
||
// 5. 写入 export_batch 记录
|
||
ExportBatch batch = ExportBatch.builder()
|
||
.companyId(companyId).batchUuid(batchUuid).datasetFilePath(path)
|
||
.sampleCount(samples.size()).finetuneStatus("NOT_STARTED")
|
||
.createdBy(getCurrentUserId()).build();
|
||
exportBatchMapper.insert(batch);
|
||
return batch;
|
||
}
|
||
```
|
||
|
||
**FinetuneService.trigger():** 调用 `aiServiceClient.startFinetune(...)` 获取 `glmJobId`,更新 `export_batch.glm_job_id` 和 `finetune_status = RUNNING`。
|
||
|
||
**接口清单(全部需要 ADMIN 权限):**
|
||
|
||
| 方法 | 路径 | 说明 |
|
||
|------|------|------|
|
||
| GET | `/api/training/samples` | 分页查询已审批通过、待导出的样本 |
|
||
| POST | `/api/export/batch` | 创建导出批次,合并 JSONL 并上传 RustFS |
|
||
| POST | `/api/export/{batchId}/finetune` | 向 GLM 工厂提交微调任务 |
|
||
| GET | `/api/export/{batchId}/status` | 查询微调任务状态 |
|
||
| GET | `/api/export/list` | 分页查询所有导出批次 |
|
||
|
||
---
|
||
|
||
### 4.7 系统配置模块
|
||
|
||
**SysConfigService.get(configKey):** 先按 `(companyId, configKey)` 查,未命中则按 `(NULL, configKey)` 查全局默认值,公司级配置优先覆盖全局值。
|
||
|
||
```java
|
||
public String get(String configKey) {
|
||
Long companyId = CompanyContext.get();
|
||
// 先查公司级配置
|
||
SysConfig cfg = configMapper.selectByCompanyAndKey(companyId, configKey);
|
||
if (cfg == null) {
|
||
// 回退到全局默认(company_id IS NULL)
|
||
cfg = configMapper.selectByCompanyAndKey(null, configKey);
|
||
}
|
||
return cfg != null ? cfg.getConfigValue() : null;
|
||
}
|
||
```
|
||
|
||
**接口清单(全部需要 ADMIN 权限):**
|
||
|
||
| 方法 | 路径 | 说明 |
|
||
|------|------|------|
|
||
| GET | `/api/config` | 获取所有配置项(公司级 + 全局默认) |
|
||
| PUT | `/api/config/{key}` | 更新单项配置(含 Prompt 模板) |
|
||
|
||
---
|
||
|
||
### 4.8 视频处理模块
|
||
|
||
**接口清单(全部需要 ADMIN 权限):**
|
||
|
||
| 方法 | 路径 | 说明 |
|
||
|------|------|------|
|
||
| POST | `/api/video/{sourceId}/process-frames` | 触发帧模式预处理(创建 FRAME_EXTRACT 任务,source_data → PREPROCESSING) |
|
||
| POST | `/api/video/{sourceId}/process-to-text` | 触发片段模式预处理(创建 VIDEO_TO_TEXT 任务,source_data → PREPROCESSING) |
|
||
| GET | `/api/video/{sourceId}/jobs` | 查询该视频的所有异步处理任务列表 |
|
||
| PUT | `/api/video/jobs/{jobId}/reset` | 将 FAILED 状态的任务手动重置为 PENDING(仅 ADMIN 可操作) |
|
||
| POST | `/api/video/jobs/{jobId}/callback` | AI 服务回调接口(内网调用,不经过 Shiro Token 验证,通过 IP 白名单保护) |
|
||
|
||
**VideoProcessService 核心逻辑:**
|
||
|
||
```java
|
||
@Transactional
|
||
@OperationLog(type = "TASK_CREATE")
|
||
public VideoProcessJob createJob(Long sourceId, String jobType, JsonNode params) {
|
||
Long companyId = CompanyContext.get();
|
||
// 1. source_data.status → PREPROCESSING
|
||
sourceDataMapper.updateStatus(sourceId, "PREPROCESSING", companyId);
|
||
// 2. 创建 video_process_job(PENDING)
|
||
VideoProcessJob job = VideoProcessJob.builder()
|
||
.companyId(companyId).sourceId(sourceId)
|
||
.jobType(jobType).params(params).status("PENDING").build();
|
||
videoProcessJobMapper.insert(job);
|
||
// 3. 异步调用 AI 服务(非阻塞,由 AI 服务自行回调)
|
||
triggerAiAsync(job);
|
||
return job;
|
||
}
|
||
|
||
// AI 服务回调:幂等处理
|
||
@Transactional
|
||
public void handleCallback(Long jobId, boolean success, String outputPath, String errorMsg) {
|
||
VideoProcessJob job = videoProcessJobMapper.selectById(jobId);
|
||
// 幂等:已是 SUCCESS 状态则静默忽略重复回调,不得重复创建 annotation_task
|
||
if ("SUCCESS".equals(job.getStatus())) return;
|
||
|
||
if (success) {
|
||
job.setStatus("SUCCESS");
|
||
job.setOutputPath(outputPath);
|
||
job.setCompletedAt(LocalDateTime.now());
|
||
// source_data.status → PENDING(进入后续标注流程)
|
||
sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
|
||
|
||
// ⚠️ 两种模式的后续任务创建(ADMIN 看到 PENDING 后可手动创建标注任务)
|
||
if ("FRAME_EXTRACT".equals(job.getJobType())) {
|
||
// 帧模式:AI 返回帧列表,每帧对应一个 annotation_task(phase=EXTRACTION,video_unit_type=FRAME)
|
||
// outputPath 指向帧列表 JSON 文件(存 RustFS),由 ADMIN 在 POST /api/tasks 时按帧创建任务
|
||
// 不在此处自动创建 EXTRACTION 任务——ADMIN 决定如何调度多帧任务
|
||
} else if ("VIDEO_TO_TEXT".equals(job.getJobType())) {
|
||
// 片段模式:派生 TEXT 类型 source_data,parent_source_id 指向原视频
|
||
// 不在此处创建——ADMIN 收到 PENDING 通知后手动通过 POST /api/tasks 为派生 TEXT 资料创建任务
|
||
// 派生 source_data 已在 triggerAiAsync() 中预创建(status=PENDING)
|
||
}
|
||
} else {
|
||
if (job.getRetryCount() >= job.getMaxRetries()) {
|
||
// 达最大重试次数,置 FAILED,source_data → PENDING 保留 error_message,需 ADMIN 手动重置
|
||
job.setStatus("FAILED");
|
||
job.setErrorMessage(errorMsg);
|
||
sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
|
||
} else {
|
||
job.setStatus("RETRYING");
|
||
job.setRetryCount(job.getRetryCount() + 1);
|
||
job.setErrorMessage(errorMsg);
|
||
}
|
||
}
|
||
videoProcessJobMapper.updateById(job);
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 五、状态机实现规范
|
||
|
||
所有状态变更**必须**经过 `StateValidator.assertTransition()` 校验,禁止绕过直接调用 Mapper 更新状态字段。
|
||
|
||
### 5.1 StateValidator
|
||
|
||
```java
|
||
// com/label/common/statemachine/StateValidator.java
|
||
public final class StateValidator {
|
||
public static <S> void assertTransition(S from, S to, Map<S, Set<S>> transitions) {
|
||
Set<S> allowed = transitions.getOrDefault(from, Set.of());
|
||
if (!allowed.contains(to))
|
||
throw new BusinessException("INVALID_STATE_TRANSITION",
|
||
"非法状态转换: " + from + " → " + to);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5.2 source_data 状态机
|
||
|
||
```java
|
||
public enum SourceStatus {
|
||
PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED, REJECTED;
|
||
|
||
public static final Map<SourceStatus, Set<SourceStatus>> TRANSITIONS = Map.of(
|
||
PENDING, Set.of(EXTRACTING, PREPROCESSING),
|
||
PREPROCESSING, Set.of(PENDING),
|
||
EXTRACTING, Set.of(QA_REVIEW),
|
||
QA_REVIEW, Set.of(APPROVED)
|
||
// ⚠️ source_data 不会进入 REJECTED 状态:
|
||
// QA 被驳回时 annotation_task → REJECTED,source_data 保持 QA_REVIEW。
|
||
// 整条流水线唯一终态为 APPROVED。
|
||
);
|
||
}
|
||
```
|
||
|
||
### 5.3 annotation_task 状态机
|
||
|
||
```java
|
||
public enum TaskStatus {
|
||
UNCLAIMED, IN_PROGRESS, SUBMITTED, APPROVED, REJECTED;
|
||
|
||
public static final Map<TaskStatus, Set<TaskStatus>> TRANSITIONS = Map.of(
|
||
UNCLAIMED, Set.of(IN_PROGRESS),
|
||
IN_PROGRESS, Set.of(SUBMITTED, UNCLAIMED, IN_PROGRESS),
|
||
// IN_PROGRESS → IN_PROGRESS 用于 ADMIN 强制转移(持有人变更,状态不变)
|
||
SUBMITTED, Set.of(APPROVED, REJECTED),
|
||
REJECTED, Set.of(IN_PROGRESS) // 驳回后重拾
|
||
);
|
||
}
|
||
```
|
||
|
||
### 5.4 training_dataset 状态机
|
||
|
||
```java
|
||
public enum DatasetStatus {
|
||
PENDING_REVIEW, APPROVED, REJECTED;
|
||
|
||
public static final Map<DatasetStatus, Set<DatasetStatus>> TRANSITIONS = Map.of(
|
||
PENDING_REVIEW, Set.of(APPROVED, REJECTED),
|
||
REJECTED, Set.of(PENDING_REVIEW) // 驳回后可修改重提
|
||
);
|
||
}
|
||
```
|
||
|
||
### 5.5 video_process_job 状态机
|
||
|
||
```java
|
||
public enum VideoJobStatus {
|
||
PENDING, RUNNING, SUCCESS, FAILED, RETRYING;
|
||
|
||
public static final Map<VideoJobStatus, Set<VideoJobStatus>> TRANSITIONS = Map.of(
|
||
PENDING, Set.of(RUNNING),
|
||
RUNNING, Set.of(SUCCESS, RETRYING, FAILED),
|
||
RETRYING, Set.of(RUNNING, FAILED)
|
||
// FAILED → PENDING 由 ADMIN 手动触发接口,不在此处声明自动流转
|
||
);
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 六、Docker Compose 配置
|
||
|
||
```yaml
|
||
# docker-compose.yml
|
||
version: "3.9"
|
||
|
||
services:
|
||
postgres:
|
||
image: postgres:16-alpine
|
||
environment:
|
||
POSTGRES_DB: label_db
|
||
POSTGRES_USER: label
|
||
POSTGRES_PASSWORD: label_password
|
||
volumes:
|
||
- postgres_data:/var/lib/postgresql/data
|
||
- ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
|
||
ports:
|
||
- "5432:5432"
|
||
healthcheck:
|
||
test: ["CMD-SHELL", "pg_isready -U label -d label_db"]
|
||
interval: 10s
|
||
timeout: 5s
|
||
retries: 5
|
||
|
||
redis:
|
||
image: redis:7-alpine
|
||
command: redis-server --requirepass redis_password
|
||
volumes:
|
||
- redis_data:/data
|
||
ports:
|
||
- "6379:6379"
|
||
healthcheck:
|
||
test: ["CMD", "redis-cli", "-a", "redis_password", "ping"]
|
||
interval: 10s
|
||
timeout: 5s
|
||
retries: 5
|
||
|
||
rustfs:
|
||
image: rustfs/rustfs:latest # 替换为生产可用的实际镜像
|
||
environment:
|
||
RUSTFS_ACCESS_KEY: minioadmin
|
||
RUSTFS_SECRET_KEY: minioadmin
|
||
volumes:
|
||
- rustfs_data:/data
|
||
ports:
|
||
- "9000:9000" # S3 API 端口
|
||
- "9001:9001" # Web 控制台端口
|
||
|
||
backend:
|
||
build:
|
||
context: .
|
||
dockerfile: Dockerfile
|
||
environment:
|
||
SPRING_DATASOURCE_URL: jdbc:postgresql://postgres:5432/label_db
|
||
SPRING_DATASOURCE_USERNAME: label
|
||
SPRING_DATASOURCE_PASSWORD: label_password
|
||
SPRING_REDIS_HOST: redis
|
||
SPRING_REDIS_PORT: 6379
|
||
SPRING_REDIS_PASSWORD: redis_password
|
||
RUSTFS_ENDPOINT: http://rustfs:9000
|
||
RUSTFS_ACCESS_KEY: minioadmin
|
||
RUSTFS_SECRET_KEY: minioadmin
|
||
AI_SERVICE_BASE_URL: http://ai-service:8000
|
||
ports:
|
||
- "8080:8080"
|
||
depends_on:
|
||
postgres:
|
||
condition: service_healthy
|
||
redis:
|
||
condition: service_healthy
|
||
rustfs:
|
||
condition: service_started
|
||
healthcheck:
|
||
test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
|
||
interval: 15s
|
||
retries: 5
|
||
|
||
ai-service:
|
||
build:
|
||
context: ../ai_service # Python FastAPI 服务所在目录
|
||
dockerfile: Dockerfile
|
||
environment:
|
||
RUSTFS_ENDPOINT: http://rustfs:9000
|
||
RUSTFS_ACCESS_KEY: minioadmin
|
||
RUSTFS_SECRET_KEY: minioadmin
|
||
ports:
|
||
- "8000:8000"
|
||
depends_on:
|
||
- rustfs
|
||
|
||
frontend:
|
||
image: nginx:alpine
|
||
volumes:
|
||
- ../frontend/dist:/usr/share/nginx/html:ro
|
||
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
|
||
ports:
|
||
- "80:80"
|
||
depends_on:
|
||
backend:
|
||
condition: service_healthy
|
||
|
||
volumes:
|
||
postgres_data:
|
||
redis_data:
|
||
rustfs_data:
|
||
```
|
||
|
||
**Dockerfile(backend):**
|
||
|
||
```dockerfile
|
||
FROM eclipse-temurin:17-jre-alpine
|
||
WORKDIR /app
|
||
COPY target/label-backend-*.jar app.jar
|
||
EXPOSE 8080
|
||
ENTRYPOINT ["java", "-jar", "app.jar"]
|
||
```
|
||
|
||
---
|
||
|
||
## 七、测试策略
|
||
|
||
### 7.1 基本原则
|
||
|
||
- 集成测试**必须**使用真实的 PostgreSQL 和 Redis 实例(Testcontainers),禁止仅 Mock 数据库
|
||
- 每个受保护接口组**必须**有至少一个集成测试覆盖 Shiro 认证/鉴权过滤器链(缺少 Token → 401,角色不足 → 403)
|
||
|
||
### 7.2 并发任务领取测试(必须)
|
||
|
||
```java
|
||
@Test
|
||
void concurrentClaimShouldOnlySucceedOnce() throws InterruptedException {
|
||
// 准备:创建一个 UNCLAIMED 任务
|
||
Long taskId = createUnclaimedTask();
|
||
int threadCount = 10;
|
||
CountDownLatch latch = new CountDownLatch(1);
|
||
AtomicInteger successCount = new AtomicInteger(0);
|
||
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
|
||
|
||
for (int i = 0; i < threadCount; i++) {
|
||
pool.submit(() -> {
|
||
try {
|
||
latch.await();
|
||
taskClaimService.claim(taskId);
|
||
successCount.incrementAndGet();
|
||
} catch (BusinessException e) {
|
||
// 预期:其余线程抛出"任务已被领取"
|
||
} catch (InterruptedException e) {
|
||
Thread.currentThread().interrupt();
|
||
}
|
||
});
|
||
}
|
||
latch.countDown();
|
||
pool.awaitTermination(5, SECONDS);
|
||
|
||
// 验证:仅 1 个线程领取成功
|
||
assertThat(successCount.get()).isEqualTo(1);
|
||
AnnotationTask task = taskMapper.selectById(taskId);
|
||
assertThat(task.getStatus()).isEqualTo("IN_PROGRESS");
|
||
assertThat(task.getClaimedBy()).isNotNull();
|
||
}
|
||
```
|
||
|
||
### 7.3 视频回调幂等测试(必须)
|
||
|
||
```java
|
||
@Test
|
||
void duplicateSuccessCallbackShouldBeIdempotent() {
|
||
Long sourceId = createVideoSource();
|
||
VideoProcessJob job = videoProcessService.createJob(sourceId, "FRAME_EXTRACT", params);
|
||
|
||
// 第一次成功回调
|
||
videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);
|
||
// 重复成功回调
|
||
videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);
|
||
|
||
// 验证:只创建一个 annotation_task
|
||
long taskCount = annotationTaskMapper.countBySourceId(sourceId);
|
||
assertThat(taskCount).isEqualTo(1);
|
||
}
|
||
```
|
||
|
||
### 7.4 状态机越界拒绝测试
|
||
|
||
```java
|
||
@Test
|
||
void illegalStateTransitionShouldThrow() {
|
||
// 验证:APPROVED → IN_PROGRESS 被拒绝
|
||
assertThatThrownBy(() ->
|
||
StateValidator.assertTransition(TaskStatus.APPROVED, TaskStatus.IN_PROGRESS, TaskStatus.TRANSITIONS)
|
||
).isInstanceOf(BusinessException.class)
|
||
.hasMessageContaining("非法状态转换");
|
||
}
|
||
```
|
||
|
||
### 7.5 多租户隔离测试
|
||
|
||
```java
|
||
@Test
|
||
void companyACannotAccessCompanyBData() {
|
||
Long sourceIdB = createSourceForCompany(companyB);
|
||
// 以 companyA 身份请求 companyB 的资源
|
||
CompanyContext.set(companyA);
|
||
assertThatThrownBy(() -> sourceService.findById(sourceIdB))
|
||
.isInstanceOf(BusinessException.class);
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 八、宪章合规检查清单
|
||
|
||
PR 合并前评审人**必须**逐条核对以下清单:
|
||
|
||
| # | 宪章原则 | 实现位置 | 检查要点 |
|
||
|---|----------|----------|----------|
|
||
| 1 | 环境约束(JDK 17、SB 3、Shiro、MyBatis Plus) | `pom.xml` | 版本号符合约束;无 Spring Security 并行引入 |
|
||
| 2 | 多租户数据隔离 | `TenantLineInnerInterceptor`、`CompanyContext` | 所有 Mapper 自动注入 company_id;ThreadLocal 在 finally 块清理 |
|
||
| 3 | BCrypt 密码、UUID Token、滑动过期、禁 JWT | `AuthService`、`TokenFilter`、`RedisKeyManager` | 无明文密码存储;每次有效请求重置 TTL;无 JWT 库引入 |
|
||
| 4 | 分级 RBAC、权限注解、角色变更驱逐缓存 | `UserRealm`、`@RequiresRoles`、`UserService` | 无 if-role 临时判断;`updateRole()` 立即删缓存 |
|
||
| 5 | 双流水线、级联触发 QA 任务、parent_source_id 溯源 | `ExtractionService.approve()` | 级联动作在同一 @Transactional 内;视频转文本的 parent_source_id 不为空 |
|
||
| 6 | 状态机完整性 | `StateValidator`、各 `*Status` 枚举 | 所有状态变更调用 `StateValidator`;无绕过直接写 Mapper 的路径 |
|
||
| 7 | 任务争抢双重保障 | `TaskClaimService.claim()` | Redis SET NX + DB WHERE status='UNCLAIMED' 两道并存 |
|
||
| 8 | 异步任务幂等、重试上限、FAILED 需手动重置 | `VideoProcessService.handleCallback()` | 重复成功回调静默忽略;`retry_count >= max_retries` 置 FAILED |
|
||
| 9 | 只追加审计日志、AOP 切面、审计失败不回滚业务 | `AuditAspect`、`@OperationLog` | `sys_operation_log` 无 UPDATE/DELETE;审计异常仅 error 日志 |
|
||
| 10 | RESTful URL、统一响应格式、分页必须 | `Result<T>`、各 Controller | 无动词路径;无裸 POJO 返回;所有列表接口有分页参数 |
|
||
| 11 | YAGNI:业务在 Service,Controller 只处理 HTTP | 包结构 | Controller 无业务判断逻辑;无未使用的抽象层 |
|
||
|
||
---
|
||
|
||
## 九、设计评审报告
|
||
|
||
**评审日期**:2026-04-09 | **评审方式**:独立子 Agent 五维评审 + 人工复核 | **初始质量评分**:6.5/10 → 修复后:**8.5/10**
|
||
|
||
### 9.1 发现并已修复的问题
|
||
|
||
| # | 严重级 | 位置 | 问题描述 | 修复内容 |
|
||
|---|--------|------|----------|----------|
|
||
| A | CRITICAL | §4.3 任务池接口 | `GET /api/tasks/pool` 的角色过滤逻辑不明确,开发者无法判断各角色看到哪些任务 | 在接口说明中补充:ANNOTATOR 看 EXTRACTION-UNCLAIMED;REVIEWER 看两类 UNCLAIMED;ADMIN 走全量接口 |
|
||
| B | HIGH | §4.4、§4.5 审批接口 | approve/reject 接口未声明"提交人不能自审"约束,存在质量绕过风险 | 在 `approve()` 和 `reject()` 代码示例中加入自审防护检查:`if (task.getClaimedBy().equals(getCurrentUserId())) throw` |
|
||
| C | HIGH | §4.8 视频处理 | 视频处理模块完全缺失 REST API 接口清单,帧模式/片段模式成功后的后续任务创建分支不清晰 | 补充 5 条接口(含 ADMIN-only 权限),在 handleCallback() 中明确两种模式的处理路径差异 |
|
||
| D | MEDIUM | §4.4 | 缺少 `ExtractionService.reject()` 代码示例,驳回后 source_data 状态不明确 | 补充 reject() 完整实现;说明 source_data 保持 EXTRACTING 等待标注员重提 |
|
||
| E | MEDIUM | §4.5 | 缺少 `QaService.reject()` 代码示例,training_dataset 驳回后状态流转不清晰 | 补充 reject():training_dataset → REJECTED;重提后 → PENDING_REVIEW |
|
||
| F | LOW | §4.3 | reassign 接口说明过于简洁,未说明 annotation_task_history 写入规则 | 补充说明:保持 IN_PROGRESS,history 写入 from=IN_PROGRESS/to=IN_PROGRESS + note |
|
||
|
||
### 9.2 未修复的设计选择(需业务澄清)
|
||
|
||
| # | 问题 | 背景 | 建议 |
|
||
|---|------|------|------|
|
||
| G | 原需求提到"若任务池中无空闲审批员,由标注员兼任审批" | 宪章要求 REVIEWER ⊃ ANNOTATOR 角色继承,系统无法动态提升 ANNOTATOR 权限 | **建议取消此机制**:统一要求 QA 审批必须由 REVIEWER 或 ADMIN 执行;若无可用 REVIEWER,由 ADMIN 代为审批。若需保留,需修改宪章并引入动态权限提升机制(超出当前架构范围) |
|
||
| H | `GET /api/tasks/{id}` 的精细访问控制 | ANNOTATOR 是否只能查看自己的任务?REJECTED 任务是否对原领取人可见? | 当前设计依赖 `company_id` 隔离,建议 Service 层加 `claimed_by = currentUserId OR role >= REVIEWER` 的访问控制检查 |
|
||
|
||
### 9.3 修复后宪章合规状态
|
||
|
||
| 宪章原则 | 修复前 | 修复后 |
|
||
|----------|--------|--------|
|
||
| 四、分级 RBAC(权限注解声明) | ✅ 基本覆盖 | ✅ 补充了自审防护,任务池过滤规则完整 |
|
||
| 五、双标注流水线(级联触发规则) | ⚠️ reject 路径缺失 | ✅ reject() 示例补全,source_data 回退路径明确 |
|
||
| 六、状态机完整性 | ⚠️ QA reject 状态转换不明 | ✅ training_dataset REJECTED → PENDING_REVIEW 完整 |
|
||
| 八、异步任务处理(幂等回调) | ⚠️ 两种视频模式处理分支缺失 | ✅ handleCallback 分支注释明确 |
|
||
|
||
---
|
||
|
||
### 9.4 审批流程合理性专项评审(第二轮)
|
||
|
||
**评审日期**:2026-04-09 | **评审焦点**:审批工作流是否可被实际执行
|
||
|
||
#### 9.4.1 发现的问题
|
||
|
||
| # | 严重级 | 位置 | 问题描述 | 根因 | 建议修复 |
|
||
|---|--------|------|----------|------|----------|
|
||
| I | CRITICAL | §4.3、§4.4、§4.5 | **REVIEWER 无审批收件箱**:`GET /api/tasks/pool` 仅返回 UNCLAIMED 任务,REVIEWER 无法发现哪些任务处于 SUBMITTED 状态等待审批。整个审批流程在 API 层面缺失入口。 | 任务池设计仅服务于"领取"场景,未考虑"审批"场景 | 新增接口:`GET /api/tasks/pending-review`(权限 REVIEWER),返回 `phase=EXTRACTION OR QA_GENERATION AND status=SUBMITTED` 的任务列表(分页);ADMIN 可查全部 |
|
||
| J | HIGH | §4.4 `ExtractionService.approve()` | **@Transactional 内同步调用 AI 服务**:`generateQa()` 在事务中发起 HTTP 请求,可能耗时 5–30 秒。将导致:① DB 连接被长期占用(连接池耗尽风险);② AI 调用失败会回滚已完成的审批动作(标注员和审批员的工作丢失);③ 审批接口响应时间不可预期。 | 审批与 QA 生成未解耦 | **两阶段拆分**:`approve()` 事务只完成步骤 1–3(is_final、APPROVED、history),然后发布 `ExtractionApprovedEvent`;`QaGenerationListener` 异步消费该事件,完成步骤 4–7(AI 调用、生成 QA、创建 QA 任务、更新 source_data)。若 AI 调用失败,审批结果不回滚,QA 任务可重试 |
|
||
| K | HIGH | §4.3、§4.4、§4.5 | **REJECTED 任务重拾路径未实现**:驳回后 task.status = REJECTED,标注员需"重新 claim",但:① `GET /api/tasks/pool` 仅展示 UNCLAIMED 任务,REJECTED 任务不可见;② 没有 `POST /api/tasks/{id}/reclaim` 接口;③ 状态机定义了 `REJECTED → IN_PROGRESS` 合法转换,但无 API 触发它。被驳回的标注员无任何操作路径。 | 驳回后续路径仅在状态机中声明,未转化为接口 | 方案一:修改 `GET /api/tasks/mine` 包含 REJECTED 状态,新增 `POST /api/tasks/{id}/reclaim` 接口(权限 ANNOTATOR,状态机校验 REJECTED → IN_PROGRESS,设置 status=IN_PROGRESS)。方案二:驳回时直接将 task 置为 UNCLAIMED,退回公共任务池(原领取人或他人均可重新领取)。**推荐方案一**(保留原领取人优先权,避免任务被他人抢占) |
|
||
| L | MEDIUM | §4.5 `QaService.approve()` | **重复变量声明(编译错误)**:方法内 `task` 变量被声明两次(第 1057 行与第 1065 行),Java 不允许同作用域内重复声明,此代码无法编译通过。 | 复制粘贴失误 | 删除第 1065 行的 `AnnotationTask task =`,直接使用第 1057 行已声明的 `task` 变量 |
|
||
| M | MEDIUM | §5.2 `SourceStatus` 状态机 | **source_data.REJECTED 状态不可达**:状态机声明 `QA_REVIEW → REJECTED` 合法,但 `QaService.reject()` 明确注释"source_data 保持 QA_REVIEW",从不触发此转换。REJECTED 是死状态,状态机与实现不一致,可能误导实现者。 | 状态机定义未与实现对齐 | 从 `SourceStatus.TRANSITIONS` 中移除 `QA_REVIEW → REJECTED` 转换,并在注释中说明:source_data 不会进入 REJECTED;QA 被驳回时 source_data 保持 QA_REVIEW,仅 annotation_task 进入 REJECTED |
|
||
|
||
#### 9.4.2 审批工作流完整路径(修复后期望状态)
|
||
|
||
```
|
||
EXTRACTION 阶段:
|
||
ANNOTATOR 领取(/api/tasks/{id}/claim)
|
||
→ 标注工作台提交(/api/extraction/{taskId}/submit)
|
||
→ REVIEWER 从审批收件箱发现(/api/tasks/pending-review)
|
||
→ 审批通过(/api/extraction/{taskId}/approve)
|
||
→ [异步] AI 生成 QA → 创建 QA_GENERATION 任务 → source_data → QA_REVIEW
|
||
→ 审批驳回(/api/extraction/{taskId}/reject)
|
||
→ 标注员从"我的任务"看到 REJECTED 任务
|
||
→ 重拾(/api/tasks/{id}/reclaim)→ IN_PROGRESS → 修改后重提
|
||
|
||
QA_GENERATION 阶段:
|
||
ANNOTATOR/REVIEWER 领取(/api/tasks/{id}/claim)
|
||
→ 问答对修改提交(/api/qa/{taskId}/submit)
|
||
→ REVIEWER 从审批收件箱发现(/api/tasks/pending-review)
|
||
→ 审批通过(/api/qa/{taskId}/approve)
|
||
→ training_dataset → APPROVED → source_data → APPROVED(流水线终态)
|
||
→ 审批驳回(/api/qa/{taskId}/reject)
|
||
→ training_dataset → REJECTED;source_data 保持 QA_REVIEW
|
||
→ 标注员重拾(/api/tasks/{id}/reclaim)→ 修改后重提
|
||
```
|
||
|
||
#### 9.4.3 需立即修复的条目
|
||
|
||
| 问题 | 修复位置 | 修复类型 |
|
||
|------|----------|----------|
|
||
| I(无审批收件箱) | §4.3 接口清单 | 新增接口 `GET /api/tasks/pending-review` |
|
||
| J(@Transactional 内 AI 调用) | §4.4 ExtractionService.approve() | 拆分为两阶段,步骤 4-7 异步化 |
|
||
| K(REJECTED 重拾路径缺失) | §4.3 接口清单 + §4.4/4.5 说明 | 新增接口 `POST /api/tasks/{id}/reclaim`;`GET /api/tasks/mine` 包含 REJECTED |
|
||
| L(重复变量声明) | §4.5 QaService.approve() 代码 | 删除第二个 `AnnotationTask task =` 声明 |
|
||
| M(source_data 死状态) | §5.2 SourceStatus 状态机 | 移除 `QA_REVIEW → REJECTED` 转换 |
|
||
|
||
---
|
||
|
||
### 9.5 数据库表设计完善性专项评审(第三轮)
|
||
|
||
**评审日期**:2026-04-09 | **评审焦点**:DDL 完整性、约束完备性、索引覆盖
|
||
|
||
#### 9.5.1 发现的问题(共 10 项)
|
||
|
||
| # | 严重级 | 表 | 问题描述 | 修复方案 |
|
||
|---|--------|-----|----------|----------|
|
||
| N | CRITICAL | `sys_config` | **全局唯一约束失效**:`UNIQUE (company_id, config_key)` 在 PostgreSQL 中,NULL 不与任何值相等(含另一个 NULL),导致 `company_id=NULL` 的行可重复插入相同 config_key。两条 `(NULL, 'model_default')` 均可写入,破坏全局配置覆盖语义。 | 将单一 UNIQUE 约束改为两个局部唯一索引(见下方 DDL 修复) |
|
||
| O | CRITICAL | `annotation_result` | **缺少 UNIQUE(task_id)**:`selectByTaskId()` 假设每个任务只有一条结果(1:1),但无唯一约束保证。高并发重复调用 `aiPreAnnotate()` 可能插入多行,导致查询时返回多行引发异常。 | 添加 `UNIQUE (task_id)` 约束 |
|
||
| P | CRITICAL | `training_dataset` | **`export_batch_id` 无引用完整性**:字段类型为 VARCHAR(50) 引用 `export_batch.batch_uuid`,但无 FK 约束。可写入不存在的批次 UUID,导出查询时出现幽灵引用。 | 改为 `export_batch_id BIGINT REFERENCES export_batch(id)`;对应代码改为存储 `export_batch.id` |
|
||
| Q | HIGH | 全部业务表 | **无 CHECK 约束保护枚举字段**:`sys_user.role`、`annotation_task.status`、`annotation_task.phase`、`source_data.status`、`training_dataset.status` 等均为裸 VARCHAR,代码 bug 或 DBA 手动 SQL 可写入 `'APPROVEEED'` 等非法值,状态机仅在应用层有效。 | 在每张表上为枚举字段添加 CHECK 约束(见下方 DDL 修复) |
|
||
| R | HIGH | `annotation_task_history` | **缺少 `operator_name` 快照**:当前只有 `operator_id`(FK)和 `operator_role` 快照。若用户被禁用或删除,历史记录无法追溯操作人姓名——与 `sys_operation_log` 的快照设计不一致。 | 新增 `operator_name VARCHAR(50) NOT NULL` 字段,写入时从当前 UserSession 读取用户名 |
|
||
| S | MEDIUM | `annotation_task` | **缺少 `(company_id, source_id)` 索引**:审批通过后查询"该 source 的全部任务"(用于 source 详情页),以及 `ExtractionService.approve()` 获取 source 类型,都需要按 source_id 查询。现有索引不覆盖此路径。 | 新增 `CREATE INDEX idx_annotation_task_source ON annotation_task(company_id, source_id)` |
|
||
| T | MEDIUM | `training_dataset` | **缺少 `task_id` 索引**:`approveByTaskId`、`rejectByTaskId`、`selectByTaskId` 均按 task_id 查询,但表上只有 `(company_id, status)` 和 `export_batch_id` 索引,无 task_id 索引。每次审批都需全表扫描(或依赖 FK 扫描)。 | 新增 `CREATE INDEX idx_training_dataset_task ON training_dataset(task_id)` |
|
||
| U | LOW | `sys_user` | **缺少 `created_by` 字段**:ADMIN 创建用户时无字段记录操作者,需查 `sys_operation_log` 才能溯源。表自身审计不完整。 | 新增 `created_by BIGINT REFERENCES sys_user(id)`(可为 NULL,系统初始化时无上级)|
|
||
| V | LOW | `source_data` | **缺少 `mime_type` 字段**:文件服务需要设置 Content-Type 响应头,当前只能从 `file_name` 后缀推断,对无扩展名或错误扩展名文件不可靠。 | 新增 `mime_type VARCHAR(100)` 字段,上传时由后端探测或由客户端提供 |
|
||
| W | LOW | 全部有 `updated_at` 的表 | **`updated_at` 无自动更新触发器**:所有表依赖应用层手动 `updated_at = NOW()`,遗漏时字段永远停在创建时间,无法用于数据变更监控。 | 为每张有 `updated_at` 的表创建 `UPDATE` 触发器(见下方 DDL 修复) |
|
||
|
||
#### 9.5.2 DDL 修复补丁(已直接修订 §2)
|
||
|
||
以下修复直接追加到 §2 DDL 末尾,作为补充 DDL:
|
||
|
||
```sql
|
||
-- =============================================
|
||
-- DDL 修复补丁(依据 §9.5 评审结论)
|
||
-- =============================================
|
||
|
||
-- N. sys_config:修复全局配置键唯一约束(PostgreSQL NULL != NULL 问题)
|
||
-- 删除原有约束,改为两个局部唯一索引
|
||
ALTER TABLE sys_config DROP CONSTRAINT IF EXISTS uq_config_company_key;
|
||
-- 全局配置:company_id IS NULL 时,config_key 唯一
|
||
CREATE UNIQUE INDEX uq_config_global_key
|
||
ON sys_config(config_key) WHERE company_id IS NULL;
|
||
-- 公司配置:同一公司内 config_key 唯一
|
||
CREATE UNIQUE INDEX uq_config_company_key
|
||
ON sys_config(company_id, config_key) WHERE company_id IS NOT NULL;
|
||
|
||
-- O. annotation_result:每个 task 只能有一条结果
|
||
ALTER TABLE annotation_result ADD CONSTRAINT uq_annotation_result_task UNIQUE (task_id);
|
||
|
||
-- P. training_dataset:export_batch_id 改为 BIGINT FK
|
||
-- (新建时使用,存量系统迁移需额外脚本)
|
||
ALTER TABLE training_dataset
|
||
ADD COLUMN export_batch_fk BIGINT REFERENCES export_batch(id);
|
||
-- 注意:export_batch_id VARCHAR(50) 字段保留用于兼容过渡,实现层改为写 export_batch_fk
|
||
|
||
-- Q. CHECK 约束:关键枚举字段
|
||
ALTER TABLE sys_user
|
||
ADD CONSTRAINT chk_user_role CHECK (role IN ('UPLOADER','ANNOTATOR','REVIEWER','ADMIN')),
|
||
ADD CONSTRAINT chk_user_status CHECK (status IN ('ACTIVE','DISABLED'));
|
||
|
||
ALTER TABLE source_data
|
||
ADD CONSTRAINT chk_source_type CHECK (data_type IN ('TEXT','IMAGE','VIDEO')),
|
||
ADD CONSTRAINT chk_source_status CHECK (status IN ('PENDING','PREPROCESSING','EXTRACTING','QA_REVIEW','APPROVED','REJECTED'));
|
||
|
||
ALTER TABLE annotation_task
|
||
ADD CONSTRAINT chk_task_phase CHECK (phase IN ('EXTRACTION','QA_GENERATION')),
|
||
ADD CONSTRAINT chk_task_status CHECK (status IN ('UNCLAIMED','IN_PROGRESS','SUBMITTED','APPROVED','REJECTED')),
|
||
ADD CONSTRAINT chk_task_type CHECK (task_type IN ('AI_ASSISTED','MANUAL'));
|
||
|
||
ALTER TABLE training_dataset
|
||
ADD CONSTRAINT chk_dataset_type CHECK (sample_type IN ('TEXT','IMAGE','VIDEO_FRAME')),
|
||
ADD CONSTRAINT chk_dataset_status CHECK (status IN ('PENDING_REVIEW','APPROVED','REJECTED'));
|
||
|
||
ALTER TABLE video_process_job
|
||
ADD CONSTRAINT chk_job_type CHECK (job_type IN ('FRAME_EXTRACT','VIDEO_TO_TEXT')),
|
||
ADD CONSTRAINT chk_job_status CHECK (status IN ('PENDING','RUNNING','SUCCESS','FAILED','RETRYING'));
|
||
|
||
-- R. annotation_task_history:补充 operator_name 快照字段
|
||
ALTER TABLE annotation_task_history
|
||
ADD COLUMN operator_name VARCHAR(50);
|
||
-- 历史存量行 operator_name 置为 '(unknown)',新增行必须填写
|
||
UPDATE annotation_task_history SET operator_name = '(unknown)' WHERE operator_name IS NULL;
|
||
ALTER TABLE annotation_task_history ALTER COLUMN operator_name SET NOT NULL;
|
||
|
||
-- S. annotation_task:补充 source_id 查询索引
|
||
CREATE INDEX idx_annotation_task_source ON annotation_task(company_id, source_id);
|
||
|
||
-- T. training_dataset:补充 task_id 查询索引
|
||
CREATE INDEX idx_training_dataset_task ON training_dataset(task_id);
|
||
|
||
-- U. sys_user:补充 created_by
|
||
ALTER TABLE sys_user ADD COLUMN created_by BIGINT REFERENCES sys_user(id);
|
||
|
||
-- V. source_data:补充 mime_type
|
||
ALTER TABLE source_data ADD COLUMN mime_type VARCHAR(100);
|
||
|
||
-- W. updated_at 自动更新触发器(以 annotation_task 为例,其余表同理)
|
||
CREATE OR REPLACE FUNCTION set_updated_at()
|
||
RETURNS TRIGGER AS $$
|
||
BEGIN
|
||
NEW.updated_at = NOW();
|
||
RETURN NEW;
|
||
END;
|
||
$$ LANGUAGE plpgsql;
|
||
|
||
-- 为所有有 updated_at 的表创建触发器
|
||
DO $$
|
||
DECLARE
|
||
tbl TEXT;
|
||
BEGIN
|
||
FOREACH tbl IN ARRAY ARRAY[
|
||
'sys_company','sys_user','source_data','annotation_task',
|
||
'annotation_result','training_dataset','export_batch','sys_config','video_process_job'
|
||
] LOOP
|
||
EXECUTE format(
|
||
'CREATE TRIGGER trg_%I_updated_at
|
||
BEFORE UPDATE ON %I
|
||
FOR EACH ROW EXECUTE FUNCTION set_updated_at()',
|
||
tbl, tbl
|
||
);
|
||
END LOOP;
|
||
END;
|
||
$$;
|
||
```
|
||
|
||
#### 9.5.3 需业务确认的事项
|
||
|
||
| # | 问题 | 背景 | 建议 |
|
||
|---|------|------|------|
|
||
| X | `training_dataset` 每个 QA 任务对应几条记录? | 一次 QA 任务可能产出多个问答对(一份文档提取了 5 个三元组,就有 5 个 QA 对)。如果是多条,`task_id` 索引应采用普通索引而非唯一索引。 | 确认多对一(一个 task → 多条 training_dataset)后,`task_id` 只建普通索引,`approveByTaskId`/`rejectByTaskId` 批量更新逻辑不变 |
|
||
| Y | `export_batch.dataset_file_path` 是否应为 NOT NULL? | 当前为 nullable,批次先创建再上传文件后更新路径。如果上传 RustFS 和 INSERT export_batch 在同一事务内,可直接 NOT NULL。 | 如能保证原子性,将 `dataset_file_path` 改为 NOT NULL,防止空路径记录存在 |
|