Files
label_backend/docs/superpowers/specs/2026-04-09-label-backend-design.md

60 KiB
Raw Blame History

label_backend 开发实施指南

版本1.0.0 | 日期2026-04-09 | 依据backend后台设计.md v2.2 + constitution.md v1.1.0


目录


一、项目总览

1.1 系统定位

label_backend 是知识图谱智能标注平台的后端服务,基于 Spring Boot 3 构建,驱动文本线图片线两条标注流水线,将文档、图片、视频原始资料处理为 GLM 微调格式的训练数据集。

1.2 数据流水线

文本线:  文档 → 三元组提取(主语/谓语/宾语 + 原文片段 + 字符偏移)
               → 问答对生成 → 审批 → 训练样本GLM 文本格式)

图片线:  图片 → 四元组提取(主体/关系/客体/修饰词 + bbox + 裁剪图)
               → 问答对生成 → 审批 → 训练样本GLM 图文格式)

视频预处理(非独立流水线):
  帧模式:   视频 → AI 抽帧 → 每帧作为图片进入图片线
  片段模式: 视频 → 多模态模型转文本 → 派生 TEXT source_data → 进入文本线

1.3 技术栈矩阵

层次 技术 版本约束
运行时 JDK 17LTS
框架 Spring Boot ≥ 3.0.x
认证/鉴权 Apache Shiro ≥ 1.13.x兼容 Spring Boot 3
ORM MyBatis Plus ≥ 3.5.x
数据库 PostgreSQL ≥ 14
缓存/锁 Redis ≥ 6.x
对象存储 RustFSS3 兼容接口) 当前稳定版
AI 服务 Python FastAPIHTTP 调用) JVM 侧仅作 RestClient 调用
容器化 Docker Compose ≥ 2.x
构建工具 Maven ≥ 3.8

1.4 后端模块清单

模块 职责
用户与权限模块 用户管理、Shiro RBAC、Token 认证
资料管理模块 文件上传至 RustFS、source_data 元数据管理
任务池模块 任务创建、领取(分布式锁 + 乐观锁)、状态流转
标注工作台模块 调用 AI 提取三/四元组、annotation_result JSONB 写入
问答生成模块 调用 AI 生成候选问答对、training_dataset 管理
训练数据导出模块 JSONL 批次导出、GLM 微调对接
系统配置模块 Prompt 模板、模型参数、全局/公司级配置管理
视频处理模块 异步抽帧 / 转文本任务跟踪、幂等回调处理

1.5 包结构

com.label
├── LabelBackendApplication.java
├── common/
│   ├── result/           # Result<T>、ResultCode、PageResult<T>
│   ├── exception/        # BusinessException、GlobalExceptionHandler
│   ├── context/          # CompanyContextThreadLocal
│   ├── shiro/            # TokenFilter、UserRealm、ShiroConfig
│   ├── redis/            # RedisKeyManager、RedisService
│   ├── aop/              # AuditAspect、@OperationLog 注解
│   ├── storage/          # RustFsClientS3 兼容封装)
│   ├── ai/               # AiServiceClientRestClient 封装 8 个端点)
│   └── statemachine/     # StateValidator、各状态枚举
├── module/
│   ├── user/
│   │   ├── controller/   # AuthController、UserController
│   │   ├── service/      # AuthService、UserService
│   │   ├── mapper/       # SysUserMapper、SysCompanyMapper
│   │   ├── entity/       # SysUser、SysCompany
│   │   └── dto/          # LoginRequest、UserDTO、UserCreateRequest
│   ├── source/
│   │   ├── controller/   # SourceController
│   │   ├── service/      # SourceService
│   │   ├── mapper/       # SourceDataMapper
│   │   ├── entity/       # SourceData
│   │   └── dto/          # SourceUploadResponse、SourceListQuery
│   ├── task/
│   │   ├── controller/   # TaskController
│   │   ├── service/      # TaskService、TaskClaimService
│   │   ├── mapper/       # AnnotationTaskMapper、TaskHistoryMapper
│   │   ├── entity/       # AnnotationTask、AnnotationTaskHistory
│   │   └── dto/          # TaskCreateRequest、TaskPoolQuery、TaskDetailDTO
│   ├── annotation/
│   │   ├── controller/   # ExtractionController、QaController
│   │   ├── service/      # ExtractionService、QaService
│   │   ├── mapper/       # AnnotationResultMapper、TrainingDatasetMapper
│   │   ├── entity/       # AnnotationResult、TrainingDataset
│   │   └── dto/          # ExtractionResultDTO、QaItemDTO、RejectRequest
│   ├── export/
│   │   ├── controller/   # ExportController
│   │   ├── service/      # ExportService、FinetuneService
│   │   ├── mapper/       # ExportBatchMapper
│   │   ├── entity/       # ExportBatch
│   │   └── dto/          # ExportBatchRequest、FinetuneRequest
│   ├── config/
│   │   ├── controller/   # SysConfigController
│   │   ├── service/      # SysConfigService
│   │   ├── mapper/       # SysConfigMapper
│   │   ├── entity/       # SysConfig
│   │   └── dto/          # ConfigUpdateRequest
│   └── video/
│       ├── controller/   # VideoController
│       ├── service/      # VideoProcessService
│       ├── mapper/       # VideoProcessJobMapper
│       ├── entity/       # VideoProcessJob
│       └── dto/          # VideoProcessRequest、VideoCallbackRequest

↑ 返回目录


二、数据库 DDL

执行顺序sys_company → sys_user → source_data → annotation_task → annotation_result → training_dataset → export_batch → sys_config → sys_operation_log → annotation_task_history → video_process_job

-- =============================================
-- 1. sys_company — 公司表(多租户根节点)
-- =============================================
CREATE TABLE sys_company (
    id           BIGSERIAL    PRIMARY KEY,
    company_name VARCHAR(100) NOT NULL UNIQUE,
    company_code VARCHAR(50)  NOT NULL UNIQUE,
    status       VARCHAR(10)  NOT NULL DEFAULT 'ACTIVE',  -- ACTIVE / DISABLED
    created_at   TIMESTAMP    NOT NULL DEFAULT NOW(),
    updated_at   TIMESTAMP    NOT NULL DEFAULT NOW()
);

-- =============================================
-- 2. sys_user — 用户表
-- =============================================
CREATE TABLE sys_user (
    id            BIGSERIAL    PRIMARY KEY,
    company_id    BIGINT       NOT NULL REFERENCES sys_company(id),
    username      VARCHAR(50)  NOT NULL,
    password_hash VARCHAR(255) NOT NULL,  -- BCrypt强度因子 >= 10
    real_name     VARCHAR(50),
    role          VARCHAR(20)  NOT NULL,  -- UPLOADER / ANNOTATOR / REVIEWER / ADMIN
    status        VARCHAR(10)  NOT NULL DEFAULT 'ACTIVE',
    created_at    TIMESTAMP    NOT NULL DEFAULT NOW(),
    updated_at    TIMESTAMP    NOT NULL DEFAULT NOW(),
    CONSTRAINT uq_company_username UNIQUE (company_id, username)
);
CREATE INDEX idx_sys_user_company ON sys_user(company_id);

-- =============================================
-- 3. source_data — 原始资料元数据表
-- =============================================
CREATE TABLE source_data (
    id               BIGSERIAL    PRIMARY KEY,
    company_id       BIGINT       NOT NULL REFERENCES sys_company(id),
    uploader_id      BIGINT       REFERENCES sys_user(id),
    data_type        VARCHAR(20)  NOT NULL,   -- TEXT / IMAGE / VIDEO
    file_path        VARCHAR(500) NOT NULL,
    file_name        VARCHAR(255) NOT NULL,
    file_size        BIGINT,
    bucket_name      VARCHAR(100) NOT NULL,
    parent_source_id BIGINT       REFERENCES source_data(id),  -- 视频转文本时指向原视频
    status           VARCHAR(20)  NOT NULL DEFAULT 'PENDING',
    -- PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED / REJECTED
    reject_reason    TEXT,
    created_at       TIMESTAMP    NOT NULL DEFAULT NOW(),
    updated_at       TIMESTAMP    NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_source_data_company ON source_data(company_id);
CREATE INDEX idx_source_data_status  ON source_data(company_id, status);
CREATE INDEX idx_source_data_parent  ON source_data(parent_source_id);

-- =============================================
-- 4. annotation_task — 标注任务表
-- =============================================
CREATE TABLE annotation_task (
    id              BIGSERIAL   PRIMARY KEY,
    company_id      BIGINT      NOT NULL REFERENCES sys_company(id),
    source_id       BIGINT      NOT NULL REFERENCES source_data(id),
    phase           VARCHAR(20) NOT NULL,  -- EXTRACTION / QA_GENERATION
    task_type       VARCHAR(20) NOT NULL,  -- AI_ASSISTED / MANUAL
    ai_model        VARCHAR(50),
    video_unit_type VARCHAR(20),           -- FRAME仅视频帧模式填写其余为空
    video_unit_info JSONB,                 -- {"frame_index":150,"time_sec":5.0,"frame_path":"..."}
    claimed_by      BIGINT      REFERENCES sys_user(id),
    claimed_at      TIMESTAMP,
    status          VARCHAR(20) NOT NULL DEFAULT 'UNCLAIMED',
    -- UNCLAIMED / IN_PROGRESS / SUBMITTED / APPROVED / REJECTED
    reject_reason   TEXT,
    submitted_at    TIMESTAMP,
    completed_at    TIMESTAMP,
    created_at      TIMESTAMP   NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMP   NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_annotation_task_company ON annotation_task(company_id);
CREATE INDEX idx_annotation_task_pool    ON annotation_task(company_id, phase, status);
CREATE INDEX idx_annotation_task_claimed ON annotation_task(claimed_by, status);

-- =============================================
-- 5. annotation_result — 标注结果表EXTRACTION 阶段JSONB 聚合)
-- =============================================
CREATE TABLE annotation_result (
    id           BIGSERIAL PRIMARY KEY,
    company_id   BIGINT    NOT NULL REFERENCES sys_company(id),
    task_id      BIGINT    NOT NULL REFERENCES annotation_task(id),
    result_json  JSONB     NOT NULL,  -- 整体覆盖,禁止局部 PATCH
    is_final     BOOLEAN   NOT NULL DEFAULT FALSE,
    submitted_by BIGINT    REFERENCES sys_user(id),
    created_at   TIMESTAMP NOT NULL DEFAULT NOW(),
    updated_at   TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_annotation_result_task  ON annotation_result(task_id);
CREATE INDEX idx_annotation_result_final ON annotation_result(company_id, is_final);

-- =============================================
-- 6. training_dataset — 训练样本表(问答对终态)
-- =============================================
CREATE TABLE training_dataset (
    id                   BIGSERIAL    PRIMARY KEY,
    company_id           BIGINT       NOT NULL REFERENCES sys_company(id),
    task_id              BIGINT       NOT NULL REFERENCES annotation_task(id),
    source_id            BIGINT       NOT NULL REFERENCES source_data(id),
    extraction_result_id BIGINT       NOT NULL REFERENCES annotation_result(id),
    sample_type          VARCHAR(20)  NOT NULL,  -- TEXT / IMAGE / VIDEO_FRAME
    glm_format_json      JSONB        NOT NULL,  -- GLM 微调格式JSONB 支持字段级查询
    export_batch_id      VARCHAR(50),            -- 未导出时为空
    status               VARCHAR(20)  NOT NULL DEFAULT 'PENDING_REVIEW',
    -- PENDING_REVIEW / APPROVED / REJECTED
    reject_reason        TEXT,
    reviewed_by          BIGINT       REFERENCES sys_user(id),
    exported_at          TIMESTAMP,
    created_at           TIMESTAMP    NOT NULL DEFAULT NOW(),
    updated_at           TIMESTAMP    NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_training_dataset_company ON training_dataset(company_id);
CREATE INDEX idx_training_dataset_status  ON training_dataset(company_id, status);
CREATE INDEX idx_training_dataset_batch   ON training_dataset(export_batch_id);

-- =============================================
-- 7. export_batch — 导出批次表
-- =============================================
CREATE TABLE export_batch (
    id                BIGSERIAL    PRIMARY KEY,
    company_id        BIGINT       NOT NULL REFERENCES sys_company(id),
    batch_uuid        VARCHAR(50)  NOT NULL UNIQUE,
    dataset_file_path VARCHAR(500),
    sample_count      INT          NOT NULL DEFAULT 0,
    glm_job_id        VARCHAR(100),          -- 调用微调接口后填写,初始为 NULL
    finetune_status   VARCHAR(20)  NOT NULL DEFAULT 'NOT_STARTED',
    -- NOT_STARTED / RUNNING / SUCCESS / FAILED
    error_message     TEXT,
    created_by        BIGINT       REFERENCES sys_user(id),
    created_at        TIMESTAMP    NOT NULL DEFAULT NOW(),
    updated_at        TIMESTAMP    NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_export_batch_company ON export_batch(company_id);

-- =============================================
-- 8. sys_config — 系统配置表(支持全局默认 + 公司级覆盖)
-- =============================================
CREATE TABLE sys_config (
    id           BIGSERIAL    PRIMARY KEY,
    company_id   BIGINT       REFERENCES sys_company(id),  -- NULL = 全局默认配置
    config_key   VARCHAR(100) NOT NULL,
    config_value TEXT         NOT NULL,
    description  TEXT,
    updated_by   BIGINT       REFERENCES sys_user(id),
    updated_at   TIMESTAMP    NOT NULL DEFAULT NOW(),
    CONSTRAINT uq_config_company_key UNIQUE (company_id, config_key)
);

-- 预置全局配置项
INSERT INTO sys_config (company_id, config_key, config_value, description) VALUES
(NULL, 'prompt_extract_text',  '...', '文本三元组提取 Prompt 模板'),
(NULL, 'prompt_extract_image', '...', '图像四元组提取 Prompt 模板'),
(NULL, 'prompt_video_to_text', '...', '视频转文本 Prompt 模板'),
(NULL, 'prompt_qa_gen_text',   '...', '文本问答对生成 Prompt 模板'),
(NULL, 'prompt_qa_gen_image',  '...', '图像问答对生成 Prompt 模板'),
(NULL, 'model_default',        'glm-4', '默认 AI 辅助模型'),
(NULL, 'video_frame_interval', '30', '视频帧模式抽帧间隔(帧数)'),
(NULL, 'token_ttl_seconds',    '7200', 'Token 有效期(秒)'),
(NULL, 'glm_api_base_url',     'http://ai-service:8000', 'GLM API 地址');

-- =============================================
-- 9. sys_operation_log — 操作审计日志(只追加,按月分区)
-- =============================================
CREATE TABLE sys_operation_log (
    id             BIGSERIAL   PRIMARY KEY,
    company_id     BIGINT      REFERENCES sys_company(id),
    operator_id    BIGINT      REFERENCES sys_user(id),    -- 登录失败时可为 NULL
    operator_name  VARCHAR(50) NOT NULL,                   -- 操作时用户名快照
    operation_type VARCHAR(50) NOT NULL,
    target_type    VARCHAR(30),
    target_id      BIGINT,
    detail         JSONB,
    ip_address     VARCHAR(50),
    result         VARCHAR(10) NOT NULL,  -- SUCCESS / FAIL
    error_message  TEXT,
    created_at     TIMESTAMP   NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- 初始分区(建议使用 pg_partman 自动维护)
CREATE TABLE sys_operation_log_2026_04
    PARTITION OF sys_operation_log
    FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');

CREATE TABLE sys_operation_log_2026_05
    PARTITION OF sys_operation_log
    FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');

-- =============================================
-- 10. annotation_task_history — 任务流转历史(只追加)
-- =============================================
CREATE TABLE annotation_task_history (
    id            BIGSERIAL   PRIMARY KEY,
    company_id    BIGINT      NOT NULL REFERENCES sys_company(id),
    task_id       BIGINT      NOT NULL REFERENCES annotation_task(id),
    from_status   VARCHAR(20),            -- 任务初建时为 NULL
    to_status     VARCHAR(20) NOT NULL,
    operator_id   BIGINT      NOT NULL REFERENCES sys_user(id),
    operator_role VARCHAR(20) NOT NULL,   -- 操作时角色快照
    note          TEXT,                   -- 驳回原因、强制转移说明等
    created_at    TIMESTAMP   NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_task_history_task ON annotation_task_history(task_id);

-- =============================================
-- 11. video_process_job — 视频异步处理任务表
-- =============================================
CREATE TABLE video_process_job (
    id              BIGSERIAL    PRIMARY KEY,
    company_id      BIGINT       NOT NULL REFERENCES sys_company(id),
    source_id       BIGINT       NOT NULL REFERENCES source_data(id),
    job_type        VARCHAR(20)  NOT NULL,  -- FRAME_EXTRACT / VIDEO_TO_TEXT
    status          VARCHAR(20)  NOT NULL DEFAULT 'PENDING',
    -- PENDING / RUNNING / SUCCESS / FAILED / RETRYING
    params          JSONB        NOT NULL,
    total_units     INT,
    processed_units INT          NOT NULL DEFAULT 0,
    output_path     VARCHAR(500),
    retry_count     INT          NOT NULL DEFAULT 0,
    max_retries     INT          NOT NULL DEFAULT 3,
    error_message   TEXT,
    started_at      TIMESTAMP,
    completed_at    TIMESTAMP,
    created_at      TIMESTAMP    NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMP    NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_video_process_job_source ON video_process_job(source_id);
CREATE INDEX idx_video_process_job_status ON video_process_job(status);

↑ 返回目录


三、公共基础设施

3.1 统一响应封装

// com/label/common/result/Result.java
public record Result<T>(String code, T data, String message) {
    public static <T> Result<T> ok(T data) {
        return new Result<>("SUCCESS", data, null);
    }
    public static Result<Void> ok() {
        return new Result<>("SUCCESS", null, null);
    }
    public static <T> Result<T> fail(String code, String message) {
        return new Result<>(code, null, message);
    }
}

// com/label/common/result/PageResult.java
public record PageResult<T>(List<T> items, long total, int page, int pageSize) {}

所有 Controller 返回 Result<T>,禁止直接返回裸 POJO 或裸 List。分页接口返回 Result<PageResult<T>>

3.2 全局异常处理

// com/label/common/exception/GlobalExceptionHandler.java
@RestControllerAdvice
public class GlobalExceptionHandler {

    @ExceptionHandler(BusinessException.class)
    public ResponseEntity<Result<?>> handleBusiness(BusinessException ex) {
        return ResponseEntity.status(ex.getHttpStatus())
            .body(Result.fail(ex.getCode(), ex.getMessage()));
    }

    @ExceptionHandler(UnauthorizedException.class)
    public ResponseEntity<Result<?>> handleUnauthorized() {
        return ResponseEntity.status(403)
            .body(Result.fail("FORBIDDEN", "权限不足"));
    }

    @ExceptionHandler(Exception.class)
    public ResponseEntity<Result<?>> handleUnexpected(Exception ex, HttpServletRequest req) {
        log.error("Unexpected error on {}", req.getRequestURI(), ex);
        // 生产响应禁止包含堆栈跟踪
        return ResponseEntity.status(500)
            .body(Result.fail("INTERNAL_ERROR", "服务器内部错误"));
    }
}

3.3 多租户 CompanyContextThreadLocal

// com/label/common/context/CompanyContext.java
public final class CompanyContext {
    private static final ThreadLocal<Long> COMPANY_ID = new ThreadLocal<>();

    public static void set(Long companyId) { COMPANY_ID.set(companyId); }
    public static Long get() { return COMPANY_ID.get(); }
    public static void clear() { COMPANY_ID.remove(); }  // 必须在 finally 块调用
}

在 Shiro TokenFilterexecuteLogin 成功后注入;在过滤器的 finally 块调用 CompanyContext.clear(),防止线程池复用时数据串漏至其他租户。

禁止调用方通过请求体传入 company_id 作为参数。所有 Service 通过 CompanyContext.get() 获取。

MyBatis Plus 配置 TenantLineInnerInterceptor 自动向所有 Mapper 查询注入 company_id = ? 条件:

@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
    MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
    interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(
        new TenantLineHandler() {
            public Expression getTenantId() {
                return new LongValue(CompanyContext.get());
            }
            public String getTenantIdColumn() { return "company_id"; }
        }
    ));
    return interceptor;
}

3.4 Shiro 配置

TokenFilter继承 AuthenticatingFilter

// com/label/common/shiro/TokenFilter.java
public class TokenFilter extends AuthenticatingFilter {

    @Override
    protected AuthenticationToken createToken(ServletRequest req, ServletResponse res) {
        String token = extractToken((HttpServletRequest) req);
        return new TokenAuthenticationToken(token);
    }

    @Override
    protected boolean onAccessDenied(ServletRequest req, ServletResponse res) throws Exception {
        String token = extractToken((HttpServletRequest) req);
        if (token == null) {
            sendUnauthorized(res, "缺少 Token");
            return false;
        }
        return executeLogin(req, res);
    }

    @Override
    protected boolean onLoginSuccess(AuthenticationToken token, Subject subject,
                                     ServletRequest req, ServletResponse res) throws Exception {
        UserSession session = (UserSession) subject.getPrincipal();
        // 滑动过期:每次有效请求重置 TTL
        long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
        redisTemplate.expire(RedisKeyManager.tokenKey(session.getToken()), ttl, SECONDS);
        // 注入租户上下文(在整个 Filter 链执行期间有效,由 doFilterInternal 的 finally 清理)
        CompanyContext.set(session.getCompanyId());
        return true;  // 继续执行 Filter 链(进入 Controller
    }

    // ⚠️ 必须在此处清理 ThreadLocal而非 onLoginSuccess——
    //    onLoginSuccess 返回 true 后 Filter 链才继续执行,此时 Controller 尚未运行。
    //    doFilterInternal 的 finally 块保证请求结束后(无论正常还是异常)都清理上下文。
    @Override
    protected void doFilterInternal(ServletRequest req, ServletResponse res, FilterChain chain)
            throws ServletException, IOException {
        try {
            super.doFilterInternal(req, res, chain);
        } finally {
            CompanyContext.clear();
        }
    }

    private String extractToken(HttpServletRequest req) {
        String header = req.getHeader("Authorization");
        if (header != null && header.startsWith("Bearer ")) {
            return header.substring(7);
        }
        return null;
    }
}

UserRealm继承 AuthorizingRealm

// com/label/common/shiro/UserRealm.java
public class UserRealm extends AuthorizingRealm {

    // 认证:从 Redis 验证 token:{uuid} 是否存在
    @Override
    protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) {
        String uuid = ((TokenAuthenticationToken) token).getToken();
        Map<Object, Object> data = redisTemplate.opsForHash()
            .entries(RedisKeyManager.tokenKey(uuid));
        if (data.isEmpty()) throw new UnknownAccountException("Token 无效或已过期");
        UserSession session = UserSession.from(uuid, data);
        return new SimpleAuthenticationInfo(session, uuid, getName());
    }

    // 鉴权:先查 Redis user:perm:{userId},未命中查 PostgreSQL
    @Override
    protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) {
        UserSession session = (UserSession) principals.getPrimaryPrincipal();
        String permKey = RedisKeyManager.userPermKey(session.getUserId());
        String cachedRole = (String) redisTemplate.opsForValue().get(permKey);
        if (cachedRole == null) {
            SysUser user = sysUserMapper.selectById(session.getUserId());
            cachedRole = user.getRole();
            redisTemplate.opsForValue().set(permKey, cachedRole, 5, MINUTES);
        }
        SimpleAuthorizationInfo info = new SimpleAuthorizationInfo();
        info.addRole(cachedRole);
        // 角色继承:高级角色包含所有低级角色权限
        addInheritedRoles(info, cachedRole);
        return info;
    }

    private void addInheritedRoles(SimpleAuthorizationInfo info, String role) {
        // ADMIN ⊃ REVIEWER ⊃ ANNOTATOR ⊃ UPLOADER
        switch (role) {
            case "ADMIN"     -> info.addRoles(Set.of("REVIEWER", "ANNOTATOR", "UPLOADER"));
            case "REVIEWER"  -> info.addRoles(Set.of("ANNOTATOR", "UPLOADER"));
            case "ANNOTATOR" -> info.addRoles(Set.of("UPLOADER"));
        }
    }
}

ShiroConfig 过滤器链

@Bean
public ShiroFilterChainDefinition shiroFilterChainDefinition() {
    DefaultShiroFilterChainDefinition chain = new DefaultShiroFilterChainDefinition();
    chain.addPathDefinition("/api/auth/login", "anon");
    chain.addPathDefinition("/api/**", "tokenFilter");
    return chain;
}

权限声明使用注解,禁止在 Service 内使用 if-role 临时判断:

@RequiresRoles("ADMIN")
public void createTask(...)

@RequiresRoles(value = {"ANNOTATOR", "REVIEWER", "ADMIN"}, logical = Logical.OR)
public void claimTask(...)

角色变更立即驱逐缓存

// UserService.updateRole() 内
@Transactional
public void updateRole(Long userId, String newRole) {
    userMapper.updateRole(userId, newRole);
    // 立即驱逐,不等 TTL 自然过期(延迟在禁用高权限账号时存在安全窗口)
    redisTemplate.delete(RedisKeyManager.userPermKey(userId));
}

3.5 Redis Key 管理

// com/label/common/redis/RedisKeyManager.java
public final class RedisKeyManager {
    public static String tokenKey(String uuid)    { return "token:" + uuid; }
    public static String userPermKey(Long userId) { return "user:perm:" + userId; }
    public static String taskClaimKey(Long taskId){ return "task:claim:" + taskId; }
}

禁止在上述三类命名空间之外自造 Key 用于认证、权限或锁目的。

3.6 AOP 审计日志切面

// com/label/common/aop/OperationLog.java注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface OperationLog {
    String type();          // 对应 operation_type 枚举值,如 "TASK_CLAIM"
    String targetType() default "";
}

// com/label/common/aop/AuditAspect.java
@Aspect
@Component
public class AuditAspect {

    @Around("@annotation(operationLog)")
    public Object audit(ProceedingJoinPoint pjp, OperationLog operationLog) throws Throwable {
        String result = "SUCCESS";
        String errorMsg = null;
        Object returnVal = null;
        try {
            returnVal = pjp.proceed();
        } catch (Throwable ex) {
            result = "FAIL";
            errorMsg = ex.getMessage();
            throw ex;
        } finally {
            // 业务事务已 commit/rollback 后,以独立操作写入审计记录
            // 审计写入失败只记录 error 日志,禁止回滚业务事务
            try {
                saveAuditLog(operationLog.type(), operationLog.targetType(), result, errorMsg);
            } catch (Exception auditEx) {
                log.error("审计日志写入失败: type={}", operationLog.type(), auditEx);
            }
        }
        return returnVal;
    }

    private void saveAuditLog(String type, String targetType, String result, String errorMsg) {
        UserSession session = getCurrentSession();
        SysOperationLog log = SysOperationLog.builder()
            .companyId(CompanyContext.get())
            .operatorId(session != null ? session.getUserId() : null)
            .operatorName(session != null ? session.getUsername() : "unknown")
            .operationType(type)
            .targetType(targetType.isEmpty() ? null : targetType)
            .result(result)
            .errorMessage(errorMsg)
            .ipAddress(getClientIp())
            .build();
        operationLogMapper.insert(log);
    }
}

3.7 RustFS S3 客户端

RustFS 实现 S3 兼容接口,使用 AWS SDK for Java v2 连接:

// com/label/common/storage/RustFsClient.java
@Component
public class RustFsClient {

    private final S3Client s3Client;  // 配置 endpoint 指向 RustFS 地址

    public String upload(String bucket, String key, InputStream data, long size) {
        PutObjectRequest req = PutObjectRequest.builder().bucket(bucket).key(key).build();
        s3Client.putObject(req, RequestBody.fromInputStream(data, size));
        return key;
    }

    public InputStream download(String bucket, String key) {
        GetObjectRequest req = GetObjectRequest.builder().bucket(bucket).key(key).build();
        return s3Client.getObject(req);
    }

    public void delete(String bucket, String key) {
        s3Client.deleteObject(b -> b.bucket(bucket).key(key));
    }

    public String getPresignedUrl(String bucket, String key, Duration expiry) {
        S3Presigner presigner = S3Presigner.builder().endpointOverride(endpointUri).build();
        return presigner.presignGetObject(b -> b.signatureDuration(expiry)
            .getObjectRequest(r -> r.bucket(bucket).key(key))).url().toString();
    }
}

对象存储路径规范:

资源类型 存储桶 路径格式
上传文本文件 source-data text/{yyyyMM}/{source_id}.txt
上传图片 source-data image/{yyyyMM}/{source_id}.jpg
上传视频 source-data video/{yyyyMM}/{source_id}.mp4
视频帧图 source-data frames/{source_id}/{frame_index}.jpg
视频片段转译文本 source-data video-text/{parent_source_id}/{timestamp}.txt
bbox 裁剪图 source-data crops/{task_id}/{item_index}.jpg
导出 JSONL finetune-export export/{batchUuid}.jsonl

3.8 AI 服务 HTTP 客户端

Spring Boot 3 使用 RestClient 封装对 Python FastAPI 服务的同步 HTTP 调用:

// com/label/common/ai/AiServiceClient.java
@Component
public class AiServiceClient {

    private final RestClient restClient;

    public AiServiceClient(@Value("${ai.service.base-url}") String baseUrl) {
        this.restClient = RestClient.builder().baseUrl(baseUrl).build();
    }

    // POST /api/v1/text/extract
    public TextExtractResponse extractText(String filePath, String model) {
        return restClient.post().uri("/api/v1/text/extract")
            .body(Map.of("file_path", filePath, "model", model))
            .retrieve().body(TextExtractResponse.class);
    }

    // POST /api/v1/image/extract
    public ImageExtractResponse extractImage(String imagePath, String model) {
        return restClient.post().uri("/api/v1/image/extract")
            .body(Map.of("image_path", imagePath, "model", model))
            .retrieve().body(ImageExtractResponse.class);
    }

    // POST /api/v1/video/extract-frames
    public VideoFramesResponse extractFrames(String videoPath, String frameInterval, String mode) {
        return restClient.post().uri("/api/v1/video/extract-frames")
            .body(Map.of("video_path", videoPath, "frame_interval", frameInterval, "mode", mode))
            .retrieve().body(VideoFramesResponse.class);
    }

    // POST /api/v1/video/to-text
    public VideoToTextResponse videoToText(String videoPath, int startSec, int endSec, String model) {
        return restClient.post().uri("/api/v1/video/to-text")
            .body(Map.of("video_path", videoPath, "start_sec", startSec, "end_sec", endSec, "model", model))
            .retrieve().body(VideoToTextResponse.class);
    }

    // POST /api/v1/qa/gen-text
    public QaGenResponse genTextQa(List<?> triples, String model, String promptTemplate) {
        return restClient.post().uri("/api/v1/qa/gen-text")
            .body(Map.of("items", triples, "model", model, "prompt_template", promptTemplate))
            .retrieve().body(QaGenResponse.class);
    }

    // POST /api/v1/qa/gen-image
    public QaGenResponse genImageQa(List<?> quadruples, String model, String promptTemplate) {
        return restClient.post().uri("/api/v1/qa/gen-image")
            .body(Map.of("items", quadruples, "model", model, "prompt_template", promptTemplate))
            .retrieve().body(QaGenResponse.class);
    }

    // POST /api/v1/finetune/start
    public FinetuneStartResponse startFinetune(String jsonlUrl, String baseModel, Map<String, Object> params) {
        return restClient.post().uri("/api/v1/finetune/start")
            .body(Map.of("jsonl_url", jsonlUrl, "base_model", baseModel, "params", params))
            .retrieve().body(FinetuneStartResponse.class);
    }

    // GET /api/v1/finetune/status/{jobId}
    public FinetuneStatusResponse getFinetuneStatus(String jobId) {
        return restClient.get().uri("/api/v1/finetune/status/{jobId}", jobId)
            .retrieve().body(FinetuneStatusResponse.class);
    }
}

↑ 返回目录


四、业务模块纵切

4.1 用户与权限模块

EntitySysUser:字段同 DDLpasswordHash 字段加 @JsonIgnore 禁止序列化到响应。

AuthService 核心逻辑:

// 登录
@OperationLog(type = "USER_LOGIN")
public String login(Long companyId, String username, String password, String ip) {
    SysUser user = userMapper.selectByCompanyAndUsername(companyId, username);
    if (user == null || !BCrypt.checkpw(password, user.getPasswordHash()))
        throw new BusinessException("USER_NOT_FOUND", "用户名或密码错误");
    if ("DISABLED".equals(user.getStatus()))
        throw new BusinessException("USER_DISABLED", "账号已禁用");

    String token = UUID.randomUUID().toString();
    Map<String, Object> tokenData = Map.of(
        "userId", user.getId(), "role", user.getRole(), "companyId", user.getCompanyId(),
        "username", user.getUsername()
    );
    long ttl = sysConfigService.getLong("token_ttl_seconds", 7200L);
    redisTemplate.opsForHash().putAll(RedisKeyManager.tokenKey(token), tokenData);
    redisTemplate.expire(RedisKeyManager.tokenKey(token), ttl, SECONDS);
    return token;
}

// 退出登录
@OperationLog(type = "USER_LOGOUT")
public void logout(String token) {
    redisTemplate.delete(RedisKeyManager.tokenKey(token));
}

接口清单:

方法 路径 最低权限 说明
POST /api/auth/login 匿名 返回 TokenUUID
POST /api/auth/logout 已登录 删除 Redis Token
GET /api/auth/me 已登录 当前用户信息与角色
GET /api/users ADMIN 分页查询用户列表
POST /api/users ADMIN 创建用户
PUT /api/users/{id} ADMIN 更新用户信息
PUT /api/users/{id}/status ADMIN 启用/禁用账号(同步驱逐权限缓存)
PUT /api/users/{id}/role ADMIN 变更角色(同步驱逐权限缓存)

4.2 资料管理模块

SourceService 核心逻辑:

@OperationLog(type = "SOURCE_UPLOAD")
@Transactional
public SourceData upload(MultipartFile file, String dataType) {
    Long companyId = CompanyContext.get();
    // 1. 先创建 source_data 记录获取 ID用于构造存储路径
    SourceData sd = SourceData.builder()
        .companyId(companyId).uploaderId(getCurrentUserId())
        .dataType(dataType).fileName(file.getOriginalFilename())
        .fileSize(file.getSize()).bucketName("source-data")
        .status("PENDING").build();
    sourceDataMapper.insert(sd);

    // 2. 构造路径并上传至 RustFS
    String path = buildPath(dataType, sd.getId());
    rustFsClient.upload("source-data", path, file.getInputStream(), file.getSize());

    // 3. 更新 file_path
    sd.setFilePath(path);
    sourceDataMapper.updateById(sd);
    return sd;
}

private String buildPath(String dataType, Long sourceId) {
    String ym = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMM"));
    return switch (dataType) {
        case "TEXT"  -> "text/" + ym + "/" + sourceId + ".txt";
        case "IMAGE" -> "image/" + ym + "/" + sourceId + ".jpg";
        case "VIDEO" -> "video/" + ym + "/" + sourceId + ".mp4";
        default -> throw new BusinessException("INVALID_TYPE", "不支持的资料类型");
    };
}

接口清单:

方法 路径 最低权限 说明
POST /api/source/upload UPLOADER 上传文件,创建 source_data 记录
GET /api/source/list UPLOADER 分页查询UPLOADER 只见自己ADMIN 见全部)
GET /api/source/{id} UPLOADER 查看资料详情
DELETE /api/source/{id} ADMIN 删除资料及 RustFS 文件

4.3 任务管理模块(含并发控制)

TaskClaimService.claim() — 双重保障:

@Transactional
@OperationLog(type = "TASK_CLAIM")
public void claim(Long taskId) {
    Long userId = getCurrentUserId();
    // 第一重Redis 分布式锁TTL 30sSET NX
    Boolean locked = redisTemplate.opsForValue()
        .setIfAbsent(RedisKeyManager.taskClaimKey(taskId), userId.toString(), 30, SECONDS);
    if (!Boolean.TRUE.equals(locked))
        throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");

    // 第二重数据库乐观约束WHERE status = 'UNCLAIMED'
    // UPDATE annotation_task SET status='IN_PROGRESS', claimed_by=?, claimed_at=NOW()
    //   WHERE id=? AND status='UNCLAIMED' AND company_id=?
    int affected = taskMapper.claimTask(taskId, userId, CompanyContext.get());
    if (affected == 0)
        throw new BusinessException("TASK_CLAIMED", "任务已被他人领取");

    // 写入任务历史(同一事务)
    insertHistory(taskId, "UNCLAIMED", "IN_PROGRESS", userId, null);
}

public void unclaim(Long taskId) {
    StateValidator.assertTransition(TaskStatus.IN_PROGRESS, TaskStatus.UNCLAIMED, TaskStatus.TRANSITIONS);
    // 更新 claimed_by = NULL, status = UNCLAIMED
    taskMapper.unclaim(taskId, getCurrentUserId(), CompanyContext.get());
    redisTemplate.delete(RedisKeyManager.taskClaimKey(taskId));
    insertHistory(taskId, "IN_PROGRESS", "UNCLAIMED", getCurrentUserId(), null);
}

每次 status 变更必须在同一事务中调用 insertHistory() 写入 annotation_task_history

接口清单:

方法 路径 最低权限 说明
POST /api/tasks ADMIN 为指定 source 创建 EXTRACTION 任务
GET /api/tasks/pool ANNOTATOR 查看可领取任务列表(按角色过滤,分页)
POST /api/tasks/{id}/claim ANNOTATOR 领取任务(争抢式)
POST /api/tasks/{id}/unclaim ANNOTATOR 放弃任务,退回任务池
GET /api/tasks/mine ANNOTATOR 查询我领取的任务列表(分页)
GET /api/tasks/{id} ANNOTATOR 查看任务详情
GET /api/tasks ADMIN 查询全部任务(支持过滤,分页)
PUT /api/tasks/{id}/reassign ADMIN 强制转移任务归属

4.4 标注工作台模块EXTRACTION 阶段)

ExtractionService 核心逻辑:

// AI 辅助预标注(标注员领取任务后调用)
public AnnotationResult aiPreAnnotate(Long taskId) {
    AnnotationTask task = taskMapper.selectById(taskId);
    SourceData source = sourceDataMapper.selectById(task.getSourceId());
    Object aiResult = "IMAGE".equals(source.getDataType()) || task.getVideoUnitType() != null
        ? aiServiceClient.extractImage(resolveImagePath(task), task.getAiModel())
        : aiServiceClient.extractText(source.getFilePath(), task.getAiModel());
    AnnotationResult result = buildAnnotationResult(task, aiResult);
    annotationResultMapper.insertOrReplace(result);
    return result;
}

// 更新提取结果整体覆盖PUT 语义)
public void updateResult(Long taskId, String resultJsonStr) {
    // 验证 JSON 格式合法性
    validateResultJson(resultJsonStr);
    // 整体替换 result_json禁止局部 PATCH 或逐条追加
    annotationResultMapper.updateResultJson(taskId, resultJsonStr, CompanyContext.get());
}

// 审批通过——级联触发,必须在同一事务内完成
@Transactional
@OperationLog(type = "EXTRACTION_APPROVE")
public void approve(Long taskId) {
    AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
    AnnotationResult result = annotationResultMapper.selectByTaskId(taskId);

    // 1. annotation_result.is_final = true
    result.setIsFinal(true);
    annotationResultMapper.updateById(result);

    // 2. annotation_task.status → APPROVED
    StateValidator.assertTransition(TaskStatus.SUBMITTED, TaskStatus.APPROVED, TaskStatus.TRANSITIONS);
    task.setStatus("APPROVED");
    task.setCompletedAt(LocalDateTime.now());
    taskMapper.updateById(task);

    // 3. 写入任务历史
    insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);

    // 4. 调用 AI 生成候选问答对
    String promptKey = "IMAGE".equals(getSourceType(task)) ? "prompt_qa_gen_image" : "prompt_qa_gen_text";
    String promptTemplate = sysConfigService.get(promptKey);
    QaGenResponse qaResponse = generateQa(task, result, promptTemplate);

    // 5. 将候选问答对写入 training_datasetPENDING_REVIEW
    List<TrainingDataset> samples = buildTrainingSamples(task, result, qaResponse);
    trainingDatasetMapper.batchInsert(samples);

    // 6. 创建 QA_GENERATION 阶段任务UNCLAIMED
    AnnotationTask qaTask = buildQaTask(task);
    taskMapper.insert(qaTask);
    insertHistory(qaTask.getId(), null, "UNCLAIMED", getCurrentUserId(), null);

    // 7. source_data.status → QA_REVIEW
    sourceDataMapper.updateStatus(task.getSourceId(), "QA_REVIEW", CompanyContext.get());
}

接口清单:

方法 路径 最低权限 说明
GET /api/extraction/{taskId} ANNOTATOR 获取当前提取结果(含 AI 预标注)
PUT /api/extraction/{taskId} ANNOTATOR 更新提取结果(整体 JSONB 覆盖)
POST /api/extraction/{taskId}/submit ANNOTATOR 提交提取结果,进入审批队列
POST /api/extraction/{taskId}/approve REVIEWER 审批通过,自动触发 QA 任务创建
POST /api/extraction/{taskId}/reject REVIEWER 驳回,附驳回原因

4.5 问答生成模块QA_GENERATION 阶段)

QaService 的整体覆盖逻辑与 ExtractionService 一致PUT 语义,禁止局部 PATCH

approve 级联动作(同一事务):

@Transactional
@OperationLog(type = "QA_APPROVE")
public void approve(Long taskId) {
    // 1. training_dataset.status → APPROVED
    trainingDatasetMapper.approveByTaskId(taskId, getCurrentUserId(), CompanyContext.get());

    // 2. annotation_task.status → APPROVED
    AnnotationTask task = validateAndGetTask(taskId, "SUBMITTED");
    task.setStatus("APPROVED");
    task.setCompletedAt(LocalDateTime.now());
    taskMapper.updateById(task);

    // 3. source_data.status → APPROVED整条流水线完成
    sourceDataMapper.updateStatus(task.getSourceId(), "APPROVED", CompanyContext.get());

    // 4. 写入任务历史
    insertHistory(taskId, "SUBMITTED", "APPROVED", getCurrentUserId(), null);
}

接口清单:

方法 路径 最低权限 说明
GET /api/qa/{taskId} ANNOTATOR 获取候选问答对列表
PUT /api/qa/{taskId} ANNOTATOR 修改问答对(整体覆盖)
POST /api/qa/{taskId}/submit ANNOTATOR 提交问答对,进入审批队列
POST /api/qa/{taskId}/approve REVIEWER 审批通过,写入 training_dataset
POST /api/qa/{taskId}/reject REVIEWER 驳回,附驳回原因

4.6 训练数据导出模块

ExportService.createBatch() 核心逻辑:

@Transactional
@OperationLog(type = "EXPORT_CREATE")
public ExportBatch createBatch(List<Long> sampleIds) {
    Long companyId = CompanyContext.get();
    // 1. 校验样本均为 APPROVED 状态
    List<TrainingDataset> samples = trainingDatasetMapper
        .selectByIdsAndStatus(sampleIds, "APPROVED", companyId);
    if (samples.size() != sampleIds.size())
        throw new BusinessException("INVALID_SAMPLES", "部分样本不处于 APPROVED 状态");

    // 2. 生成 JSONL 内容(每行一个 glm_format_json
    String batchUuid = UUID.randomUUID().toString();
    String jsonl = samples.stream()
        .map(s -> s.getGlmFormatJson().toString())
        .collect(Collectors.joining("\n"));

    // 3. 上传至 RustFSfinetune-export/export/{batchUuid}.jsonl
    String path = "export/" + batchUuid + ".jsonl";
    byte[] bytes = jsonl.getBytes(StandardCharsets.UTF_8);
    rustFsClient.upload("finetune-export", path, new ByteArrayInputStream(bytes), bytes.length);

    // 4. 更新样本 export_batch_id 与 exported_at
    trainingDatasetMapper.batchUpdateExportInfo(sampleIds, batchUuid, LocalDateTime.now());

    // 5. 写入 export_batch 记录
    ExportBatch batch = ExportBatch.builder()
        .companyId(companyId).batchUuid(batchUuid).datasetFilePath(path)
        .sampleCount(samples.size()).finetuneStatus("NOT_STARTED")
        .createdBy(getCurrentUserId()).build();
    exportBatchMapper.insert(batch);
    return batch;
}

FinetuneService.trigger() 调用 aiServiceClient.startFinetune(...) 获取 glmJobId,更新 export_batch.glm_job_idfinetune_status = RUNNING

接口清单(全部需要 ADMIN 权限):

方法 路径 说明
GET /api/training/samples 分页查询已审批通过、待导出的样本
POST /api/export/batch 创建导出批次,合并 JSONL 并上传 RustFS
POST /api/export/{batchId}/finetune 向 GLM 工厂提交微调任务
GET /api/export/{batchId}/status 查询微调任务状态
GET /api/export/list 分页查询所有导出批次

4.7 系统配置模块

SysConfigService.get(configKey) 先按 (companyId, configKey) 查,未命中则按 (NULL, configKey) 查全局默认值,公司级配置优先覆盖全局值。

public String get(String configKey) {
    Long companyId = CompanyContext.get();
    // 先查公司级配置
    SysConfig cfg = configMapper.selectByCompanyAndKey(companyId, configKey);
    if (cfg == null) {
        // 回退到全局默认company_id IS NULL
        cfg = configMapper.selectByCompanyAndKey(null, configKey);
    }
    return cfg != null ? cfg.getConfigValue() : null;
}

接口清单(全部需要 ADMIN 权限):

方法 路径 说明
GET /api/config 获取所有配置项(公司级 + 全局默认)
PUT /api/config/{key} 更新单项配置(含 Prompt 模板)

4.8 视频处理模块

VideoProcessService 核心逻辑:

@Transactional
@OperationLog(type = "TASK_CREATE")
public VideoProcessJob createJob(Long sourceId, String jobType, JsonNode params) {
    Long companyId = CompanyContext.get();
    // 1. source_data.status → PREPROCESSING
    sourceDataMapper.updateStatus(sourceId, "PREPROCESSING", companyId);
    // 2. 创建 video_process_jobPENDING
    VideoProcessJob job = VideoProcessJob.builder()
        .companyId(companyId).sourceId(sourceId)
        .jobType(jobType).params(params).status("PENDING").build();
    videoProcessJobMapper.insert(job);
    // 3. 异步调用 AI 服务(非阻塞,由 AI 服务自行回调)
    triggerAiAsync(job);
    return job;
}

// AI 服务回调:幂等处理
@Transactional
public void handleCallback(Long jobId, boolean success, String outputPath, String errorMsg) {
    VideoProcessJob job = videoProcessJobMapper.selectById(jobId);
    // 幂等:已是 SUCCESS 状态则静默忽略重复回调,不得重复创建 annotation_task
    if ("SUCCESS".equals(job.getStatus())) return;

    if (success) {
        job.setStatus("SUCCESS");
        job.setOutputPath(outputPath);
        job.setCompletedAt(LocalDateTime.now());
        // source_data.status → PENDING进入后续标注流程
        sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
    } else {
        if (job.getRetryCount() >= job.getMaxRetries()) {
            // 达最大重试次数,置 FAILED需 ADMIN 手动重置为 PENDING 后才可重新触发
            job.setStatus("FAILED");
            job.setErrorMessage(errorMsg);
            sourceDataMapper.updateStatus(job.getSourceId(), "PENDING", job.getCompanyId());
        } else {
            job.setStatus("RETRYING");
            job.setRetryCount(job.getRetryCount() + 1);
            job.setErrorMessage(errorMsg);
        }
    }
    videoProcessJobMapper.updateById(job);
}

↑ 返回目录


五、状态机实现规范

所有状态变更必须经过 StateValidator.assertTransition() 校验,禁止绕过直接调用 Mapper 更新状态字段。

5.1 StateValidator

// com/label/common/statemachine/StateValidator.java
public final class StateValidator {
    public static <S> void assertTransition(S from, S to, Map<S, Set<S>> transitions) {
        Set<S> allowed = transitions.getOrDefault(from, Set.of());
        if (!allowed.contains(to))
            throw new BusinessException("INVALID_STATE_TRANSITION",
                "非法状态转换: " + from + " → " + to);
    }
}

5.2 source_data 状态机

public enum SourceStatus {
    PENDING, PREPROCESSING, EXTRACTING, QA_REVIEW, APPROVED, REJECTED;

    public static final Map<SourceStatus, Set<SourceStatus>> TRANSITIONS = Map.of(
        PENDING,       Set.of(EXTRACTING, PREPROCESSING),
        PREPROCESSING, Set.of(PENDING),
        EXTRACTING,    Set.of(QA_REVIEW),
        QA_REVIEW,     Set.of(APPROVED, REJECTED),
        REJECTED,      Set.of(EXTRACTING)   // 驳回后可重提
    );
}

5.3 annotation_task 状态机

public enum TaskStatus {
    UNCLAIMED, IN_PROGRESS, SUBMITTED, APPROVED, REJECTED;

    public static final Map<TaskStatus, Set<TaskStatus>> TRANSITIONS = Map.of(
        UNCLAIMED,   Set.of(IN_PROGRESS),
        IN_PROGRESS, Set.of(SUBMITTED, UNCLAIMED, IN_PROGRESS),
        // IN_PROGRESS → IN_PROGRESS 用于 ADMIN 强制转移(持有人变更,状态不变)
        SUBMITTED,   Set.oAPPROVED, REJECTED),
        REJECTED,    Set.of(IN_PROGRESS)   // 驳回后重拾
    );
}

5.4 training_dataset 状态机

public enum DatasetStatus {
    PENDING_REVIEW, APPROVED, REJECTED;

    public static final Map<DatasetStatus, Set<DatasetStatus>> TRANSITIONS = Map.of(
        PENDING_REVIEW, Set.of(APPROVED, REJECTED),
        REJECTED,       Set.of(PENDING_REVIEW)   // 驳回后可修改重提
    );
}

5.5 video_process_job 状态机

public enum VideoJobStatus {
    PENDING, RUNNING, SUCCESS, FAILED, RETRYING;

    public static final Map<VideoJobStatus, Set<VideoJobStatus>> TRANSITIONS = Map.of(
        PENDING,  Set.of(RUNNING),
        RUNNING,  Set.of(SUCCESS, RETRYING, FAILED),
        RETRYING, Set.of(RUNNING, FAILED)
        // FAILED → PENDING 由 ADMIN 手动触发接口,不在此处声明自动流转
    );
}

↑ 返回目录


六、Docker Compose 配置

# docker-compose.yml
version: "3.9"

services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: label_db
      POSTGRES_USER: label
      POSTGRES_PASSWORD: label_password
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./sql/init.sql:/docker-entrypoint-initdb.d/init.sql
    ports:
      - "5432:5432"
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U label -d label_db"]
      interval: 10s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    command: redis-server --requirepass redis_password
    volumes:
      - redis_data:/data
    ports:
      - "6379:6379"
    healthcheck:
      test: ["CMD", "redis-cli", "-a", "redis_password", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  rustfs:
    image: rustfs/rustfs:latest   # 替换为生产可用的实际镜像
    environment:
      RUSTFS_ACCESS_KEY: minioadmin
      RUSTFS_SECRET_KEY: minioadmin
    volumes:
      - rustfs_data:/data
    ports:
      - "9000:9000"   # S3 API 端口
      - "9001:9001"   # Web 控制台端口

  backend:
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      SPRING_DATASOURCE_URL: jdbc:postgresql://postgres:5432/label_db
      SPRING_DATASOURCE_USERNAME: label
      SPRING_DATASOURCE_PASSWORD: label_password
      SPRING_REDIS_HOST: redis
      SPRING_REDIS_PORT: 6379
      SPRING_REDIS_PASSWORD: redis_password
      RUSTFS_ENDPOINT: http://rustfs:9000
      RUSTFS_ACCESS_KEY: minioadmin
      RUSTFS_SECRET_KEY: minioadmin
      AI_SERVICE_BASE_URL: http://ai-service:8000
    ports:
      - "8080:8080"
    depends_on:
      postgres:
        condition: service_healthy
      redis:
        condition: service_healthy
      rustfs:
        condition: service_started
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/actuator/health"]
      interval: 15s
      retries: 5

  ai-service:
    build:
      context: ../ai_service   # Python FastAPI 服务所在目录
      dockerfile: Dockerfile
    environment:
      RUSTFS_ENDPOINT: http://rustfs:9000
      RUSTFS_ACCESS_KEY: minioadmin
      RUSTFS_SECRET_KEY: minioadmin
    ports:
      - "8000:8000"
    depends_on:
      - rustfs

  frontend:
    image: nginx:alpine
    volumes:
      - ../frontend/dist:/usr/share/nginx/html:ro
      - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
    ports:
      - "80:80"
    depends_on:
      backend:
        condition: service_healthy

volumes:
  postgres_data:
  redis_data:
  rustfs_data:

Dockerfilebackend

FROM eclipse-temurin:17-jre-alpine
WORKDIR /app
COPY target/label-backend-*.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]

↑ 返回目录


七、测试策略

7.1 基本原则

  • 集成测试必须使用真实的 PostgreSQL 和 Redis 实例Testcontainers禁止仅 Mock 数据库
  • 每个受保护接口组必须有至少一个集成测试覆盖 Shiro 认证/鉴权过滤器链(缺少 Token → 401角色不足 → 403

7.2 并发任务领取测试(必须)

@Test
void concurrentClaimShouldOnlySucceedOnce() throws InterruptedException {
    // 准备:创建一个 UNCLAIMED 任务
    Long taskId = createUnclaimedTask();
    int threadCount = 10;
    CountDownLatch latch = new CountDownLatch(1);
    AtomicInteger successCount = new AtomicInteger(0);
    ExecutorService pool = Executors.newFixedThreadPool(threadCount);

    for (int i = 0; i < threadCount; i++) {
        pool.submit(() -> {
            try {
                latch.await();
                taskClaimService.claim(taskId);
                successCount.incrementAndGet();
            } catch (BusinessException e) {
                // 预期:其余线程抛出"任务已被领取"
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    latch.countDown();
    pool.awaitTermination(5, SECONDS);

    // 验证:仅 1 个线程领取成功
    assertThat(successCount.get()).isEqualTo(1);
    AnnotationTask task = taskMapper.selectById(taskId);
    assertThat(task.getStatus()).isEqualTo("IN_PROGRESS");
    assertThat(task.getClaimedBy()).isNotNull();
}

7.3 视频回调幂等测试(必须)

@Test
void duplicateSuccessCallbackShouldBeIdempotent() {
    Long sourceId = createVideoSource();
    VideoProcessJob job = videoProcessService.createJob(sourceId, "FRAME_EXTRACT", params);

    // 第一次成功回调
    videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);
    // 重复成功回调
    videoProcessService.handleCallback(job.getId(), true, "frames/1/0.jpg", null);

    // 验证:只创建一个 annotation_task
    long taskCount = annotationTaskMapper.countBySourceId(sourceId);
    assertThat(taskCount).isEqualTo(1);
}

7.4 状态机越界拒绝测试

@Test
void illegalStateTransitionShouldThrow() {
    // 验证APPROVED → IN_PROGRESS 被拒绝
    assertThatThrownBy(() ->
        StateValidator.assertTransition(TaskStatus.APPROVED, TaskStatus.IN_PROGRESS, TaskStatus.TRANSITIONS)
    ).isInstanceOf(BusinessException.class)
     .hasMessageContaining("非法状态转换");
}

7.5 多租户隔离测试

@Test
void companyACannotAccessCompanyBData() {
    Long sourceIdB = createSourceForCompany(companyB);
    // 以 companyA 身份请求 companyB 的资源
    CompanyContext.set(companyA);
    assertThatThrownBy(() -> sourceService.findById(sourceIdB))
        .isInstanceOf(BusinessException.class);
}

↑ 返回目录


八、宪章合规检查清单

PR 合并前评审人必须逐条核对以下清单:

# 宪章原则 实现位置 检查要点
1 环境约束JDK 17、SB 3、Shiro、MyBatis Plus pom.xml 版本号符合约束;无 Spring Security 并行引入
2 多租户数据隔离 TenantLineInnerInterceptorCompanyContext 所有 Mapper 自动注入 company_idThreadLocal 在 finally 块清理
3 BCrypt 密码、UUID T//oken、滑动过期、禁 JWT AuthServiceTokenFilterRedisKeyManager 无明文密码存储;每次有效请求重置 TTL无 JWT 库引入
4 分级 RBAC、权限注解、角色变更驱逐缓存 UserRealm@RequiresRolesUserService 无 if-role 临时判断;updateRole() 立即删缓存
5 双流水线、级联触发 QA 任务、parent_source_id 溯源 ExtractionService.approve() 级联动作在同一 @Transactional 内;视频转文本的 parent_source_id 不为空
6 状态机完整性 StateValidator、各 *Status 枚举 所有状态变更调用 StateValidator;无绕过直接写 Mapper 的路径
7 任务争抢双重保障 TaskClaimService.claim() Redis SET NX + DB WHERE status='UNCLAIMED' 两道并存
8 异步任务幂等、重试上限、FAILED 需手动重置 VideoProcessService.handleCallback() 重复成功回调静默忽略;retry_count >= max_retries 置 FAILED
9 只追加审计日志、AOP 切面、审计失败不回滚业务 AuditAspect@OperationLog sys_operation_log 无 UPDATE/DELETE审计异常仅 error 日志
10 RESTful URL、统一响应格式、分页必须 Result<T>、各 Controller 无动词路径;无裸 POJO 返回;所有列表接口有分页参数
11 YAGNI业务在 ServiceController 只处理 HTTP 包结构 Controller 无业务判断逻辑;无未使用的抽象层

↑ 返回目录