diff --git a/src/main/java/com/label/common/shiro/TokenFilter.java b/src/main/java/com/label/common/shiro/TokenFilter.java index bd03a22..acb7292 100644 --- a/src/main/java/com/label/common/shiro/TokenFilter.java +++ b/src/main/java/com/label/common/shiro/TokenFilter.java @@ -44,7 +44,9 @@ public class TokenFilter extends OncePerRequestFilter { @Override protected boolean shouldNotFilter(HttpServletRequest request) { String path = request.getServletPath(); - return !path.startsWith("/api/") || path.equals("/api/auth/login"); + return !path.startsWith("/api/") + || path.equals("/api/auth/login") + || path.equals("/api/video/callback"); // AI 服务内部回调,不走用户 Token 认证 } @Override diff --git a/src/main/java/com/label/module/config/controller/SysConfigController.java b/src/main/java/com/label/module/config/controller/SysConfigController.java new file mode 100644 index 0000000..695d3a0 --- /dev/null +++ b/src/main/java/com/label/module/config/controller/SysConfigController.java @@ -0,0 +1,61 @@ +package com.label.module.config.controller; + +import com.label.common.result.Result; +import com.label.common.shiro.TokenPrincipal; +import com.label.module.config.entity.SysConfig; +import com.label.module.config.service.SysConfigService; +import jakarta.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import org.apache.shiro.authz.annotation.RequiresRoles; +import org.springframework.web.bind.annotation.*; + +import java.util.List; +import java.util.Map; + +/** + * 系统配置接口(2 个端点,均需 ADMIN 权限)。 + * + * GET /api/config — 查询当前公司所有可见配置(公司专属 + 全局默认合并) + * PUT /api/config/{key} — 更新/创建公司专属配置(UPSERT) + */ +@RestController +@RequiredArgsConstructor +public class SysConfigController { + + private final SysConfigService sysConfigService; + + /** + * GET /api/config — 查询合并后的配置列表。 + * + * 响应中每条配置含 scope 字段: + * - "COMPANY":当前公司专属配置(优先生效) + * - "GLOBAL":全局默认配置(公司未覆盖时生效) + */ + @GetMapping("/api/config") + @RequiresRoles("ADMIN") + public Result>> listConfig(HttpServletRequest request) { + TokenPrincipal principal = principal(request); + return Result.success(sysConfigService.list(principal.getCompanyId())); + } + + /** + * PUT /api/config/{key} — UPSERT 公司专属配置。 + * + * Body: { "value": "...", "description": "..." } + */ + @PutMapping("/api/config/{key}") + @RequiresRoles("ADMIN") + public Result updateConfig(@PathVariable String key, + @RequestBody Map body, + HttpServletRequest request) { + String value = body.get("value"); + String description = body.get("description"); + TokenPrincipal principal = principal(request); + return Result.success( + sysConfigService.update(key, value, description, principal.getCompanyId())); + } + + private TokenPrincipal principal(HttpServletRequest request) { + return (TokenPrincipal) request.getAttribute("__token_principal__"); + } +} diff --git a/src/main/java/com/label/module/config/entity/SysConfig.java b/src/main/java/com/label/module/config/entity/SysConfig.java new file mode 100644 index 0000000..f28e4fb --- /dev/null +++ b/src/main/java/com/label/module/config/entity/SysConfig.java @@ -0,0 +1,41 @@ +package com.label.module.config.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * 系统配置实体,对应 sys_config 表。 + * + * company_id 为 NULL 时表示全局默认配置,非 NULL 时表示租户专属配置(优先级更高)。 + * 注:sys_config 已加入 MybatisPlusConfig.IGNORED_TABLES,不走多租户过滤器。 + */ +@Data +@TableName("sys_config") +public class SysConfig { + + @TableId(type = IdType.AUTO) + private Long id; + + /** + * 所属公司 ID(NULL = 全局默认配置;非 NULL = 租户专属配置)。 + * 注意:不能用 @TableField(exist = false) 排除,必须保留以支持 company_id IS NULL 查询。 + */ + private Long companyId; + + /** 配置键 */ + private String configKey; + + /** 配置值 */ + private String configValue; + + /** 配置说明 */ + private String description; + + private LocalDateTime createdAt; + + private LocalDateTime updatedAt; +} diff --git a/src/main/java/com/label/module/config/mapper/SysConfigMapper.java b/src/main/java/com/label/module/config/mapper/SysConfigMapper.java new file mode 100644 index 0000000..c63c5c9 --- /dev/null +++ b/src/main/java/com/label/module/config/mapper/SysConfigMapper.java @@ -0,0 +1,36 @@ +package com.label.module.config.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.label.module.config.entity.SysConfig; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; + +/** + * sys_config 表 Mapper。 + * + * 注意:sys_config 已加入 MybatisPlusConfig.IGNORED_TABLES,不走多租户过滤器, + * 需手动传入 companyId 进行过滤。 + */ +@Mapper +public interface SysConfigMapper extends BaseMapper { + + /** 查询指定公司的配置(租户专属,优先级高) */ + @Select("SELECT * FROM sys_config WHERE company_id = #{companyId} AND config_key = #{configKey}") + SysConfig selectByCompanyAndKey(@Param("companyId") Long companyId, + @Param("configKey") String configKey); + + /** 查询全局默认配置(company_id IS NULL) */ + @Select("SELECT * FROM sys_config WHERE company_id IS NULL AND config_key = #{configKey}") + SysConfig selectGlobalByKey(@Param("configKey") String configKey); + + /** + * 查询指定公司所有可见配置(公司专属 + 全局默认), + * 按 company_id DESC NULLS LAST 排序(公司专属优先于全局默认)。 + */ + @Select("SELECT * FROM sys_config WHERE company_id = #{companyId} OR company_id IS NULL " + + "ORDER BY company_id DESC NULLS LAST") + List selectAllForCompany(@Param("companyId") Long companyId); +} diff --git a/src/main/java/com/label/module/config/service/SysConfigService.java b/src/main/java/com/label/module/config/service/SysConfigService.java new file mode 100644 index 0000000..be9b679 --- /dev/null +++ b/src/main/java/com/label/module/config/service/SysConfigService.java @@ -0,0 +1,139 @@ +package com.label.module.config.service; + +import com.label.common.exception.BusinessException; +import com.label.module.config.entity.SysConfig; +import com.label.module.config.mapper.SysConfigMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; +import java.util.*; +import java.util.stream.Collectors; + +/** + * 系统配置服务。 + * + * 配置查找优先级:公司专属(company_id = N)> 全局默认(company_id IS NULL)。 + * + * get() — 按优先级返回单个配置值 + * list() — 返回合并后的配置列表(公司专属覆盖同名全局配置),附 scope 字段 + * update() — 以公司专属配置进行 UPSERT(仅允许已知配置键) + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class SysConfigService { + + /** 系统已知配置键白名单(防止写入未知键) */ + private static final Set KNOWN_KEYS = Set.of( + "token_ttl_seconds", + "model_default", + "video_frame_interval" + ); + + private final SysConfigMapper configMapper; + + // ------------------------------------------------------------------ 查询单值 -- + + /** + * 按优先级获取配置值:公司专属优先,否则回退全局默认。 + * + * @param configKey 配置键 + * @param companyId 当前公司 ID + * @return 配置值(不存在时返回 null) + */ + public String get(String configKey, Long companyId) { + // 先查公司专属 + SysConfig company = configMapper.selectByCompanyAndKey(companyId, configKey); + if (company != null) { + return company.getConfigValue(); + } + // 回退全局默认 + SysConfig global = configMapper.selectGlobalByKey(configKey); + return global != null ? global.getConfigValue() : null; + } + + // ------------------------------------------------------------------ 查询列表 -- + + /** + * 返回当前公司所有可见配置(公司专属 + 全局默认合并), + * 附加 scope 字段("COMPANY" / "GLOBAL")标识来源。 + * + * @param companyId 当前公司 ID + * @return 配置列表(含 scope) + */ + public List> list(Long companyId) { + List all = configMapper.selectAllForCompany(companyId); + + // 按 configKey 分组,公司专属优先(排序保证公司专属在前) + Map merged = new LinkedHashMap<>(); + for (SysConfig cfg : all) { + // 由于 SQL 按 company_id DESC NULLS LAST 排序,公司专属先出现,直接 putIfAbsent + merged.putIfAbsent(cfg.getConfigKey(), cfg); + } + + return merged.values().stream() + .map(cfg -> { + Map item = new LinkedHashMap<>(); + item.put("id", cfg.getId()); + item.put("configKey", cfg.getConfigKey()); + item.put("configValue", cfg.getConfigValue()); + item.put("description", cfg.getDescription()); + item.put("scope", cfg.getCompanyId() != null ? "COMPANY" : "GLOBAL"); + item.put("companyId", cfg.getCompanyId()); + return item; + }) + .collect(Collectors.toList()); + } + + // ------------------------------------------------------------------ 更新配置 -- + + /** + * 更新公司专属配置(UPSERT)。 + * + * 仅允许 KNOWN_KEYS 中的配置键,防止写入未定义的配置项。 + * + * @param configKey 配置键 + * @param value 新配置值 + * @param description 配置说明(可选) + * @param companyId 当前公司 ID + */ + @Transactional + public SysConfig update(String configKey, String value, + String description, Long companyId) { + if (!KNOWN_KEYS.contains(configKey)) { + throw new BusinessException("UNKNOWN_CONFIG_KEY", + "未知配置键: " + configKey, HttpStatus.BAD_REQUEST); + } + + if (value == null || value.isBlank()) { + throw new BusinessException("INVALID_CONFIG_VALUE", + "配置值不能为空", HttpStatus.BAD_REQUEST); + } + + // UPSERT:如公司专属配置已存在则更新,否则插入 + SysConfig existing = configMapper.selectByCompanyAndKey(companyId, configKey); + if (existing != null) { + existing.setConfigValue(value); + if (description != null && !description.isBlank()) { + existing.setDescription(description); + } + existing.setUpdatedAt(LocalDateTime.now()); + configMapper.updateById(existing); + log.debug("公司配置已更新: companyId={}, key={}, value={}", companyId, configKey, value); + return existing; + } else { + SysConfig cfg = new SysConfig(); + cfg.setCompanyId(companyId); + cfg.setConfigKey(configKey); + cfg.setConfigValue(value); + cfg.setDescription(description); + configMapper.insert(cfg); + log.debug("公司配置已创建: companyId={}, key={}, value={}", companyId, configKey, value); + return cfg; + } + } +} diff --git a/src/main/java/com/label/module/export/service/FinetuneService.java b/src/main/java/com/label/module/export/service/FinetuneService.java index ea7555c..092117a 100644 --- a/src/main/java/com/label/module/export/service/FinetuneService.java +++ b/src/main/java/com/label/module/export/service/FinetuneService.java @@ -33,11 +33,14 @@ public class FinetuneService { /** * 向 GLM AI 服务提交微调任务。 * + * T074 设计:AI 调用不在 @Transactional 内执行,避免持有 DB 连接期间发起 HTTP 请求。 + * DB 写入(updateFinetuneInfo)是单条 UPDATE,不需要显式事务(自动提交)。 + * 如果 AI 调用成功但 DB 写入失败,下次查询状态仍可通过 AI 服务的 jobId 重建状态。 + * * @param batchId 批次 ID * @param principal 当前用户 * @return 包含 glmJobId 和 finetuneStatus 的 Map */ - @Transactional public Map trigger(Long batchId, TokenPrincipal principal) { ExportBatch batch = exportService.getById(batchId, principal); @@ -46,7 +49,7 @@ public class FinetuneService { "微调任务已提交,当前状态: " + batch.getFinetuneStatus(), HttpStatus.CONFLICT); } - // 调用 AI 服务提交微调(在事务外完成,此处事务仅保护后续 DB 写入) + // 调用 AI 服务(无事务,不持有 DB 连接) AiServiceClient.FinetuneRequest req = AiServiceClient.FinetuneRequest.builder() .datasetPath(batch.getDatasetFilePath()) .model("glm-4") @@ -61,7 +64,7 @@ public class FinetuneService { "提交微调任务失败: " + e.getMessage(), HttpStatus.SERVICE_UNAVAILABLE); } - // 更新批次记录 + // AI 调用成功后更新批次记录(单条 UPDATE,自动提交) exportBatchMapper.updateFinetuneInfo(batchId, response.getJobId(), "RUNNING", principal.getCompanyId()); diff --git a/src/main/java/com/label/module/source/controller/SourceController.java b/src/main/java/com/label/module/source/controller/SourceController.java index d971454..5ba0d8e 100644 --- a/src/main/java/com/label/module/source/controller/SourceController.java +++ b/src/main/java/com/label/module/source/controller/SourceController.java @@ -9,7 +9,6 @@ import jakarta.servlet.http.HttpServletRequest; import lombok.RequiredArgsConstructor; import org.apache.shiro.authz.annotation.RequiresRoles; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; @@ -33,13 +32,13 @@ public class SourceController { */ @PostMapping("/upload") @RequiresRoles("UPLOADER") - public ResponseEntity> upload( + @ResponseStatus(HttpStatus.CREATED) + public Result upload( @RequestParam("file") MultipartFile file, @RequestParam("dataType") String dataType, HttpServletRequest request) { TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__"); - SourceResponse response = sourceService.upload(file, dataType, principal); - return ResponseEntity.status(HttpStatus.CREATED).body(Result.success(response)); + return Result.success(sourceService.upload(file, dataType, principal)); } /** diff --git a/src/main/java/com/label/module/video/controller/VideoController.java b/src/main/java/com/label/module/video/controller/VideoController.java new file mode 100644 index 0000000..64093a9 --- /dev/null +++ b/src/main/java/com/label/module/video/controller/VideoController.java @@ -0,0 +1,85 @@ +package com.label.module.video.controller; + +import com.label.common.result.Result; +import com.label.common.shiro.TokenPrincipal; +import com.label.module.video.entity.VideoProcessJob; +import com.label.module.video.service.VideoProcessService; +import jakarta.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.shiro.authz.annotation.RequiresRoles; +import org.springframework.web.bind.annotation.*; + +import java.util.Map; + +/** + * 视频处理接口(4 个端点)。 + * + * POST /api/video/process — 触发视频处理(ADMIN) + * GET /api/video/jobs/{jobId} — 查询任务状态(ADMIN) + * POST /api/video/jobs/{jobId}/reset — 重置失败任务(ADMIN) + * POST /api/video/callback — AI 回调接口(无需认证,已在 TokenFilter 中排除) + */ +@Slf4j +@RestController +@RequiredArgsConstructor +public class VideoController { + + private final VideoProcessService videoProcessService; + + /** POST /api/video/process — 触发视频处理任务 */ + @PostMapping("/api/video/process") + @RequiresRoles("ADMIN") + public Result createJob(@RequestBody Map body, + HttpServletRequest request) { + Long sourceId = Long.parseLong(body.get("sourceId").toString()); + String jobType = (String) body.get("jobType"); + String params = body.containsKey("params") ? body.get("params").toString() : null; + + TokenPrincipal principal = principal(request); + return Result.success( + videoProcessService.createJob(sourceId, jobType, params, principal.getCompanyId())); + } + + /** GET /api/video/jobs/{jobId} — 查询视频处理任务 */ + @GetMapping("/api/video/jobs/{jobId}") + @RequiresRoles("ADMIN") + public Result getJob(@PathVariable Long jobId, + HttpServletRequest request) { + return Result.success(videoProcessService.getJob(jobId, principal(request).getCompanyId())); + } + + /** POST /api/video/jobs/{jobId}/reset — 管理员重置失败任务 */ + @PostMapping("/api/video/jobs/{jobId}/reset") + @RequiresRoles("ADMIN") + public Result resetJob(@PathVariable Long jobId, + HttpServletRequest request) { + return Result.success(videoProcessService.reset(jobId, principal(request).getCompanyId())); + } + + /** + * POST /api/video/callback — AI 服务回调(无需 Bearer Token)。 + * + * 此端点已在 TokenFilter.shouldNotFilter() 中排除认证, + * 由 AI 服务直接调用,携带 jobId、status、outputPath 等参数。 + * + * Body 示例: + * { "jobId": 123, "status": "SUCCESS", "outputPath": "processed/123/frames.zip" } + * { "jobId": 123, "status": "FAILED", "errorMessage": "ffmpeg error: ..." } + */ + @PostMapping("/api/video/callback") + public Result handleCallback(@RequestBody Map body) { + Long jobId = Long.parseLong(body.get("jobId").toString()); + String status = (String) body.get("status"); + String outputPath = body.containsKey("outputPath") ? (String) body.get("outputPath") : null; + String errorMessage = body.containsKey("errorMessage") ? (String) body.get("errorMessage") : null; + + log.info("视频处理回调:jobId={}, status={}", jobId, status); + videoProcessService.handleCallback(jobId, status, outputPath, errorMessage); + return Result.success(null); + } + + private TokenPrincipal principal(HttpServletRequest request) { + return (TokenPrincipal) request.getAttribute("__token_principal__"); + } +} diff --git a/src/main/java/com/label/module/video/entity/VideoProcessJob.java b/src/main/java/com/label/module/video/entity/VideoProcessJob.java new file mode 100644 index 0000000..b4833e6 --- /dev/null +++ b/src/main/java/com/label/module/video/entity/VideoProcessJob.java @@ -0,0 +1,57 @@ +package com.label.module.video.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * 视频处理任务实体,对应 video_process_job 表。 + * + * jobType 取值:FRAME_EXTRACT / VIDEO_TO_TEXT + * status 取值:PENDING / RUNNING / SUCCESS / FAILED / RETRYING + */ +@Data +@TableName("video_process_job") +public class VideoProcessJob { + + @TableId(type = IdType.AUTO) + private Long id; + + /** 所属公司(多租户键) */ + private Long companyId; + + /** 关联资料 ID */ + private Long sourceId; + + /** 任务类型:FRAME_EXTRACT / VIDEO_TO_TEXT */ + private String jobType; + + /** 任务状态:PENDING / RUNNING / SUCCESS / FAILED / RETRYING */ + private String status; + + /** 任务参数(JSONB,例如 {"frameInterval": 30}) */ + private String params; + + /** AI 处理输出路径(成功后填写) */ + private String outputPath; + + /** 已重试次数 */ + private Integer retryCount; + + /** 最大重试次数(默认 3) */ + private Integer maxRetries; + + /** 错误信息 */ + private String errorMessage; + + private LocalDateTime startedAt; + + private LocalDateTime completedAt; + + private LocalDateTime createdAt; + + private LocalDateTime updatedAt; +} diff --git a/src/main/java/com/label/module/video/mapper/VideoProcessJobMapper.java b/src/main/java/com/label/module/video/mapper/VideoProcessJobMapper.java new file mode 100644 index 0000000..a90ea58 --- /dev/null +++ b/src/main/java/com/label/module/video/mapper/VideoProcessJobMapper.java @@ -0,0 +1,12 @@ +package com.label.module.video.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.label.module.video.entity.VideoProcessJob; +import org.apache.ibatis.annotations.Mapper; + +/** + * video_process_job 表 Mapper。 + */ +@Mapper +public interface VideoProcessJobMapper extends BaseMapper { +} diff --git a/src/main/java/com/label/module/video/service/VideoProcessService.java b/src/main/java/com/label/module/video/service/VideoProcessService.java new file mode 100644 index 0000000..2866b58 --- /dev/null +++ b/src/main/java/com/label/module/video/service/VideoProcessService.java @@ -0,0 +1,289 @@ +package com.label.module.video.service; + +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.label.common.ai.AiServiceClient; +import com.label.common.exception.BusinessException; +import com.label.common.statemachine.SourceStatus; +import com.label.common.statemachine.StateValidator; +import com.label.module.source.entity.SourceData; +import com.label.module.source.mapper.SourceDataMapper; +import com.label.module.video.entity.VideoProcessJob; +import com.label.module.video.mapper.VideoProcessJobMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.time.LocalDateTime; +import java.util.Map; + +/** + * 视频处理服务:创建任务、处理回调、管理员重置。 + * + * 状态流转: + * - 创建时:source_data → PREPROCESSING,job → PENDING + * - 回调成功:job → SUCCESS,source_data → PENDING(进入提取队列) + * - 回调失败(可重试):job → RETRYING,retryCount++,重新触发 AI + * - 回调失败(超出上限):job → FAILED,source_data → PENDING + * - 管理员重置:job → PENDING(可手动重新触发) + * + * T074 设计说明: + * AI 调用通过 TransactionSynchronizationManager.registerSynchronization().afterCommit() + * 延迟到事务提交后执行,避免在持有 DB 连接期间进行 HTTP 调用。 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class VideoProcessService { + + private final VideoProcessJobMapper jobMapper; + private final SourceDataMapper sourceDataMapper; + private final AiServiceClient aiServiceClient; + + @Value("${rustfs.bucket:label-source-data}") + private String bucket; + + // ------------------------------------------------------------------ 创建任务 -- + + /** + * 创建视频处理任务并在事务提交后触发 AI 服务。 + * + * DB 写入(source_data→PREPROCESSING + 插入 job)在 @Transactional 内完成; + * AI 触发通过 afterCommit() 在事务提交后执行,不占用 DB 连接。 + * + * @param sourceId 资料 ID + * @param jobType 任务类型(FRAME_EXTRACT / VIDEO_TO_TEXT) + * @param params JSON 参数(如 {"frameInterval": 30}) + * @param companyId 租户 ID + * @return 新建的 VideoProcessJob + */ + @Transactional + public VideoProcessJob createJob(Long sourceId, String jobType, + String params, Long companyId) { + SourceData source = sourceDataMapper.selectById(sourceId); + if (source == null || !companyId.equals(source.getCompanyId())) { + throw new BusinessException("NOT_FOUND", "资料不存在: " + sourceId, HttpStatus.NOT_FOUND); + } + + validateJobType(jobType); + + // source_data → PREPROCESSING + StateValidator.assertTransition( + SourceStatus.TRANSITIONS, + SourceStatus.valueOf(source.getStatus()), SourceStatus.PREPROCESSING); + sourceDataMapper.update(null, new LambdaUpdateWrapper() + .eq(SourceData::getId, sourceId) + .set(SourceData::getStatus, "PREPROCESSING") + .set(SourceData::getUpdatedAt, LocalDateTime.now())); + + // 插入 PENDING 任务 + VideoProcessJob job = new VideoProcessJob(); + job.setCompanyId(companyId); + job.setSourceId(sourceId); + job.setJobType(jobType); + job.setStatus("PENDING"); + job.setParams(params != null ? params : "{}"); + job.setRetryCount(0); + job.setMaxRetries(3); + jobMapper.insert(job); + + // 事务提交后触发 AI(不在事务内,不占用 DB 连接) + final Long jobId = job.getId(); + final String filePath = source.getFilePath(); + final String finalJobType = jobType; + + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + triggerAi(jobId, sourceId, filePath, finalJobType); + } + }); + + log.debug("视频处理任务已创建(AI 将在事务提交后触发): jobId={}, sourceId={}", jobId, sourceId); + return job; + } + + // ------------------------------------------------------------------ 处理回调 -- + + /** + * 处理 AI 服务异步回调(POST /api/video/callback,无需用户 Token)。 + * + * 幂等:若 job 已为 SUCCESS,直接返回,防止重复处理。 + * 重试触发同样延迟到事务提交后(afterCommit),不在事务内执行。 + * + * @param jobId 任务 ID + * @param callbackStatus AI 回调状态(SUCCESS / FAILED) + * @param outputPath 成功时的输出路径(可选) + * @param errorMessage 失败时的错误信息(可选) + */ + @Transactional + public void handleCallback(Long jobId, String callbackStatus, + String outputPath, String errorMessage) { + VideoProcessJob job = jobMapper.selectById(jobId); + if (job == null) { + log.warn("视频处理回调:job 不存在,jobId={}", jobId); + return; + } + + // 幂等:已成功则忽略重复回调 + if ("SUCCESS".equals(job.getStatus())) { + log.debug("视频处理回调幂等:jobId={} 已为 SUCCESS,跳过", jobId); + return; + } + + if ("SUCCESS".equals(callbackStatus)) { + handleSuccess(job, outputPath); + } else { + handleFailure(job, errorMessage); + } + } + + // ------------------------------------------------------------------ 管理员重置 -- + + /** + * 管理员手动重置失败任务(FAILED → PENDING)。 + * + * 仅允许 FAILED 状态的任务重置,重置后 retryCount 清零, + * 管理员可随后重新调用 createJob 触发处理。 + * + * @param jobId 任务 ID + * @param companyId 租户 ID + */ + @Transactional + public VideoProcessJob reset(Long jobId, Long companyId) { + VideoProcessJob job = jobMapper.selectById(jobId); + if (job == null || !companyId.equals(job.getCompanyId())) { + throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND); + } + + if (!"FAILED".equals(job.getStatus())) { + throw new BusinessException("INVALID_TRANSITION", + "只有 FAILED 状态的任务可以重置,当前状态: " + job.getStatus(), + HttpStatus.BAD_REQUEST); + } + + jobMapper.update(null, new LambdaUpdateWrapper() + .eq(VideoProcessJob::getId, jobId) + .set(VideoProcessJob::getStatus, "PENDING") + .set(VideoProcessJob::getRetryCount, 0) + .set(VideoProcessJob::getErrorMessage, null) + .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); + + job.setStatus("PENDING"); + job.setRetryCount(0); + log.debug("视频处理任务已重置: jobId={}", jobId); + return job; + } + + // ------------------------------------------------------------------ 查询 -- + + public VideoProcessJob getJob(Long jobId, Long companyId) { + VideoProcessJob job = jobMapper.selectById(jobId); + if (job == null || !companyId.equals(job.getCompanyId())) { + throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND); + } + return job; + } + + // ------------------------------------------------------------------ 私有方法 -- + + private void handleSuccess(VideoProcessJob job, String outputPath) { + // job → SUCCESS + jobMapper.update(null, new LambdaUpdateWrapper() + .eq(VideoProcessJob::getId, job.getId()) + .set(VideoProcessJob::getStatus, "SUCCESS") + .set(VideoProcessJob::getOutputPath, outputPath) + .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) + .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); + + // source_data PREPROCESSING → PENDING(进入提取队列) + sourceDataMapper.update(null, new LambdaUpdateWrapper() + .eq(SourceData::getId, job.getSourceId()) + .set(SourceData::getStatus, "PENDING") + .set(SourceData::getUpdatedAt, LocalDateTime.now())); + + log.debug("视频处理成功:jobId={}, sourceId={}", job.getId(), job.getSourceId()); + } + + private void handleFailure(VideoProcessJob job, String errorMessage) { + int newRetryCount = job.getRetryCount() + 1; + int maxRetries = job.getMaxRetries() != null ? job.getMaxRetries() : 3; + + if (newRetryCount < maxRetries) { + // 仍有重试次数:job → RETRYING,事务提交后重新触发 AI + jobMapper.update(null, new LambdaUpdateWrapper() + .eq(VideoProcessJob::getId, job.getId()) + .set(VideoProcessJob::getStatus, "RETRYING") + .set(VideoProcessJob::getRetryCount, newRetryCount) + .set(VideoProcessJob::getErrorMessage, errorMessage) + .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); + + log.warn("视频处理失败,开始第 {} 次重试:jobId={}, error={}", + newRetryCount, job.getId(), errorMessage); + + // 重试 AI 触发延迟到事务提交后 + SourceData source = sourceDataMapper.selectById(job.getSourceId()); + if (source != null) { + final Long jobId = job.getId(); + final Long sourceId = job.getSourceId(); + final String filePath = source.getFilePath(); + final String jobType = job.getJobType(); + + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + triggerAi(jobId, sourceId, filePath, jobType); + } + }); + } + } else { + // 超出最大重试次数:job → FAILED,source_data → PENDING + jobMapper.update(null, new LambdaUpdateWrapper() + .eq(VideoProcessJob::getId, job.getId()) + .set(VideoProcessJob::getStatus, "FAILED") + .set(VideoProcessJob::getRetryCount, newRetryCount) + .set(VideoProcessJob::getErrorMessage, errorMessage) + .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) + .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); + + // source_data PREPROCESSING → PENDING(管理员可重新处理) + sourceDataMapper.update(null, new LambdaUpdateWrapper() + .eq(SourceData::getId, job.getSourceId()) + .set(SourceData::getStatus, "PENDING") + .set(SourceData::getUpdatedAt, LocalDateTime.now())); + + log.error("视频处理永久失败:jobId={}, sourceId={}, error={}", + job.getId(), job.getSourceId(), errorMessage); + } + } + + private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType) { + AiServiceClient.VideoProcessRequest req = AiServiceClient.VideoProcessRequest.builder() + .sourceId(sourceId) + .filePath(filePath) + .bucket(bucket) + .params(Map.of("jobId", jobId, "jobType", jobType)) + .build(); + try { + if ("FRAME_EXTRACT".equals(jobType)) { + aiServiceClient.extractFrames(req); + } else { + aiServiceClient.videoToText(req); + } + log.debug("AI 触发成功: jobId={}", jobId); + } catch (Exception e) { + log.warn("触发视频处理 AI 失败(jobId={}):{},job 保持当前状态等待重试", jobId, e.getMessage()); + } + } + + private void validateJobType(String jobType) { + if (!"FRAME_EXTRACT".equals(jobType) && !"VIDEO_TO_TEXT".equals(jobType)) { + throw new BusinessException("INVALID_JOB_TYPE", + "任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT", HttpStatus.BAD_REQUEST); + } + } +} diff --git a/src/test/java/com/label/integration/MultiTenantIsolationTest.java b/src/test/java/com/label/integration/MultiTenantIsolationTest.java new file mode 100644 index 0000000..960ec32 --- /dev/null +++ b/src/test/java/com/label/integration/MultiTenantIsolationTest.java @@ -0,0 +1,198 @@ +package com.label.integration; + +import com.label.AbstractIntegrationTest; +import com.label.common.redis.RedisKeyManager; +import com.label.common.redis.RedisService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.*; + +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * 多租户隔离集成测试(Phase 10 / T070)。 + * + * 测试场景: + * 1. 公司 A 的 ADMIN 查询资料列表 → 只能看到公司 A 的资料,看不到公司 B 的 + * 2. 公司 B 的 ADMIN 查询任务 → 只能看到公司 B 的任务,看不到公司 A 的 + * 3. 公司 A 的 sys_config 配置不影响公司 B(配置隔离) + */ +public class MultiTenantIsolationTest extends AbstractIntegrationTest { + + private static final String TOKEN_A = "test-admin-token-company-a"; + private static final String TOKEN_B = "test-admin-token-company-b"; + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private RedisService redisService; + + private Long companyAId; // DEMO 公司(已在 init.sql 中创建) + private Long companyBId; // 测试用第二家公司 + private Long adminAId; // DEMO 公司 admin + private Long adminBId; // 第二家公司 admin + + @BeforeEach + void setupCompaniesAndTokens() { + // 公司 A:使用 init.sql 中的 DEMO 公司 + companyAId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_company WHERE company_code = 'DEMO'", Long.class); + adminAId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_user WHERE username = 'admin' AND company_id = ?", + Long.class, companyAId); + + // 公司 B:在测试中创建第二家公司 + jdbcTemplate.execute( + "INSERT INTO sys_company (company_name, company_code, status) " + + "VALUES ('测试公司B', 'TESTB', 'ACTIVE') ON CONFLICT DO NOTHING"); + companyBId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_company WHERE company_code = 'TESTB'", Long.class); + + // 为公司 B 创建 admin 用户 + jdbcTemplate.execute( + "INSERT INTO sys_user (company_id, username, password_hash, real_name, role, status) " + + "VALUES (" + companyBId + ", 'admin_b', " + + "'$2a$10$B8iR5z43URiNPm.eut3JvufIPBuvGx5ZZmqyUqE1A1WdbZppX5bmi', " + + "'B公司管理员', 'ADMIN', 'ACTIVE') ON CONFLICT (company_id, username) DO NOTHING"); + adminBId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_user WHERE username = 'admin_b' AND company_id = ?", + Long.class, companyBId); + + // 伪造 Redis Token + redisService.hSetAll(RedisKeyManager.tokenKey(TOKEN_A), + Map.of("userId", adminAId.toString(), "role", "ADMIN", + "companyId", companyAId.toString(), "username", "admin"), + 3600L); + redisService.hSetAll(RedisKeyManager.tokenKey(TOKEN_B), + Map.of("userId", adminBId.toString(), "role", "ADMIN", + "companyId", companyBId.toString(), "username", "admin_b"), + 3600L); + + // 公司 A 插入两条 source_data + jdbcTemplate.execute( + "INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " + + "file_name, file_size, bucket_name, status) " + + "VALUES (" + companyAId + ", " + adminAId + ", 'TEXT', " + + "'company-a/file1.txt', 'file1.txt', 100, 'label-source-data', 'PENDING'), " + + "(" + companyAId + ", " + adminAId + ", 'TEXT', " + + "'company-a/file2.txt', 'file2.txt', 200, 'label-source-data', 'PENDING')"); + + // 公司 B 插入一条 source_data + jdbcTemplate.execute( + "INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " + + "file_name, file_size, bucket_name, status) " + + "VALUES (" + companyBId + ", " + adminBId + ", 'TEXT', " + + "'company-b/file1.txt', 'file1.txt', 300, 'label-source-data', 'PENDING')"); + } + + @AfterEach + void cleanupTokensAndCompanyB() { + redisService.delete(RedisKeyManager.tokenKey(TOKEN_A)); + redisService.delete(RedisKeyManager.tokenKey(TOKEN_B)); + // 清理公司 B 的数据(sys_company 不在 cleanData TRUNCATE 范围内) + jdbcTemplate.execute("DELETE FROM sys_user WHERE username = 'admin_b'"); + jdbcTemplate.execute("DELETE FROM sys_company WHERE company_code = 'TESTB'"); + } + + // ------------------------------------------------------------------ 测试 1: 资料列表隔离 -- + + @Test + @DisplayName("公司 A 只能查看本公司资料,看不到公司 B 的资料") + void sourceList_companyA_cannotSeeCompanyBData() { + ResponseEntity resp = restTemplate.exchange( + baseUrl("/api/source/list?page=1&pageSize=50"), + HttpMethod.GET, + bearerRequest(TOKEN_A), + Map.class); + assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); + + @SuppressWarnings("unchecked") + Map data = (Map) resp.getBody().get("data"); + assertThat(((Number) data.get("total")).longValue()) + .as("公司 A 应只看到自己的 2 条资料") + .isEqualTo(2L); + + @SuppressWarnings("unchecked") + List> records = (List>) data.get("records"); + records.forEach(r -> + assertThat(((Number) r.get("companyId")).longValue()) + .as("每条资料的 companyId 应为公司 A 的 ID") + .isEqualTo(companyAId)); + } + + @Test + @DisplayName("公司 B 只能查看本公司资料,看不到公司 A 的资料") + void sourceList_companyB_cannotSeeCompanyAData() { + ResponseEntity resp = restTemplate.exchange( + baseUrl("/api/source/list?page=1&pageSize=50"), + HttpMethod.GET, + bearerRequest(TOKEN_B), + Map.class); + assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); + + @SuppressWarnings("unchecked") + Map data = (Map) resp.getBody().get("data"); + assertThat(((Number) data.get("total")).longValue()) + .as("公司 B 应只看到自己的 1 条资料") + .isEqualTo(1L); + } + + // ------------------------------------------------------------------ 测试 2: 配置隔离 -- + + @Test + @DisplayName("公司 A 设置专属配置,公司 B 仍使用全局默认") + void sysConfig_companyA_doesNotAffectCompanyB() { + // 公司 A 设置专属 model_default + HttpHeaders headersA = new HttpHeaders(); + headersA.set("Authorization", "Bearer " + TOKEN_A); + headersA.setContentType(MediaType.APPLICATION_JSON); + + restTemplate.exchange( + baseUrl("/api/config/model_default"), + HttpMethod.PUT, + new HttpEntity<>(Map.of("value", "glm-4-plus"), headersA), + Map.class); + + // 公司 B 查询配置列表 + ResponseEntity respB = restTemplate.exchange( + baseUrl("/api/config"), + HttpMethod.GET, + bearerRequest(TOKEN_B), + Map.class); + assertThat(respB.getStatusCode()).isEqualTo(HttpStatus.OK); + + @SuppressWarnings("unchecked") + List> configsB = (List>) respB.getBody().get("data"); + + Map modelCfgB = configsB.stream() + .filter(c -> "model_default".equals(c.get("configKey"))) + .findFirst() + .orElse(null); + + if (modelCfgB != null) { + // 公司 B 未设置专属,应使用全局默认 glm-4,scope=GLOBAL + assertThat(modelCfgB.get("scope")) + .as("公司 B 应使用全局默认配置,scope=GLOBAL") + .isEqualTo("GLOBAL"); + assertThat(modelCfgB.get("configValue")) + .as("公司 B model_default 应为全局默认 glm-4,不受公司 A 设置影响") + .isEqualTo("glm-4"); + } + } + + // ------------------------------------------------------------------ 工具方法 -- + + private HttpEntity bearerRequest(String token) { + HttpHeaders headers = new HttpHeaders(); + headers.set("Authorization", "Bearer " + token); + return new HttpEntity<>(headers); + } +} diff --git a/src/test/java/com/label/integration/SysConfigIntegrationTest.java b/src/test/java/com/label/integration/SysConfigIntegrationTest.java new file mode 100644 index 0000000..da361c0 --- /dev/null +++ b/src/test/java/com/label/integration/SysConfigIntegrationTest.java @@ -0,0 +1,183 @@ +package com.label.integration; + +import com.label.AbstractIntegrationTest; +import com.label.common.redis.RedisKeyManager; +import com.label.common.redis.RedisService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.*; + +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * 系统配置集成测试(US8)。 + * + * 测试场景: + * 1. 公司专属配置覆盖全局默认 + * 2. 未设置公司专属时,回退至全局默认 + * 3. 未知配置键 → 400 UNKNOWN_CONFIG_KEY + */ +public class SysConfigIntegrationTest extends AbstractIntegrationTest { + + private static final String ADMIN_TOKEN = "test-admin-token-config"; + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private RedisService redisService; + + private Long companyId; + private Long adminUserId; + + @BeforeEach + void setupToken() { + companyId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_company WHERE company_code = 'DEMO'", Long.class); + adminUserId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_user WHERE username = 'admin'", Long.class); + + // 伪造 Redis Token + redisService.hSetAll(RedisKeyManager.tokenKey(ADMIN_TOKEN), + Map.of("userId", adminUserId.toString(), "role", "ADMIN", + "companyId", companyId.toString(), "username", "admin"), + 3600L); + } + + @AfterEach + void cleanupTokens() { + redisService.delete(RedisKeyManager.tokenKey(ADMIN_TOKEN)); + } + + // ------------------------------------------------------------------ 测试 1: 公司配置覆盖全局 -- + + @Test + @DisplayName("公司专属配置优先于全局默认(scope=COMPANY 覆盖 scope=GLOBAL)") + void companyConfig_overridesGlobalDefault() { + // 设置公司专属配置(覆盖全局 model_default) + updateConfig("model_default", "glm-4-plus", "公司专属模型"); + + // 查询配置列表 + ResponseEntity listResp = restTemplate.exchange( + baseUrl("/api/config"), + HttpMethod.GET, + bearerRequest(ADMIN_TOKEN), + Map.class); + assertThat(listResp.getStatusCode()).isEqualTo(HttpStatus.OK); + + @SuppressWarnings("unchecked") + List> configs = (List>) listResp.getBody().get("data"); + assertThat(configs).isNotEmpty(); + + // 找到 model_default 配置 + Map modelConfig = configs.stream() + .filter(c -> "model_default".equals(c.get("configKey"))) + .findFirst() + .orElseThrow(() -> new AssertionError("model_default 配置不存在")); + + // 应返回公司专属配置值,scope=COMPANY + assertThat(modelConfig.get("configValue")) + .as("公司专属配置应覆盖全局默认") + .isEqualTo("glm-4-plus"); + assertThat(modelConfig.get("scope")) + .as("scope 应标记为 COMPANY") + .isEqualTo("COMPANY"); + } + + // ------------------------------------------------------------------ 测试 2: 回退全局默认 -- + + @Test + @DisplayName("未设置公司专属配置时,返回全局默认值(scope=GLOBAL)") + void globalConfig_usedWhenNoCompanyOverride() { + // 不设置公司专属,直接查询列表 + ResponseEntity listResp = restTemplate.exchange( + baseUrl("/api/config"), + HttpMethod.GET, + bearerRequest(ADMIN_TOKEN), + Map.class); + assertThat(listResp.getStatusCode()).isEqualTo(HttpStatus.OK); + + @SuppressWarnings("unchecked") + List> configs = (List>) listResp.getBody().get("data"); + + // 至少包含 AbstractIntegrationTest.cleanData() 中插入的全局配置 + assertThat(configs).isNotEmpty(); + + // 所有配置都应有 scope 字段 + configs.forEach(cfg -> + assertThat(cfg.containsKey("scope")).as("每条配置应含 scope 字段").isTrue()); + + // token_ttl_seconds 全局默认应为 7200 + Map ttlConfig = configs.stream() + .filter(c -> "token_ttl_seconds".equals(c.get("configKey"))) + .findFirst() + .orElse(null); + + if (ttlConfig != null) { + assertThat(ttlConfig.get("configValue")).isEqualTo("7200"); + assertThat(ttlConfig.get("scope")).isEqualTo("GLOBAL"); + } + } + + // ------------------------------------------------------------------ 测试 3: 未知配置键 -- + + @Test + @DisplayName("更新未知配置键 → 400 UNKNOWN_CONFIG_KEY") + void updateUnknownKey_returns400() { + ResponseEntity resp = updateConfig("unknown_key_xyz", "someValue", null); + assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.BAD_REQUEST); + assertThat(resp.getBody().get("code")).isEqualTo("UNKNOWN_CONFIG_KEY"); + } + + // ------------------------------------------------------------------ 测试 4: UPSERT 同键两次 -- + + @Test + @DisplayName("同一配置键两次 PUT → 第二次更新而非重复插入") + void updateSameKey_twice_upserts() { + updateConfig("video_frame_interval", "60", "帧间隔 60s"); + updateConfig("video_frame_interval", "120", "帧间隔 120s"); + + // 数据库中公司专属 video_frame_interval 应只有一条记录 + Integer count = jdbcTemplate.queryForObject( + "SELECT COUNT(*) FROM sys_config WHERE company_id = ? AND config_key = 'video_frame_interval'", + Integer.class, companyId); + assertThat(count).as("UPSERT:同键应只有一条公司专属记录").isEqualTo(1); + + // 值应为最后一次 PUT 的值 + String value = jdbcTemplate.queryForObject( + "SELECT config_value FROM sys_config WHERE company_id = ? AND config_key = 'video_frame_interval'", + String.class, companyId); + assertThat(value).isEqualTo("120"); + } + + // ------------------------------------------------------------------ 工具方法 -- + + private ResponseEntity updateConfig(String key, String value, String description) { + HttpHeaders headers = new HttpHeaders(); + headers.set("Authorization", "Bearer " + ADMIN_TOKEN); + headers.setContentType(MediaType.APPLICATION_JSON); + + Map body = description != null + ? Map.of("value", value, "description", description) + : Map.of("value", value); + + return restTemplate.exchange( + baseUrl("/api/config/" + key), + HttpMethod.PUT, + new HttpEntity<>(body, headers), + Map.class); + } + + private HttpEntity bearerRequest(String token) { + HttpHeaders headers = new HttpHeaders(); + headers.set("Authorization", "Bearer " + token); + return new HttpEntity<>(headers); + } +} diff --git a/src/test/java/com/label/integration/VideoCallbackIdempotencyTest.java b/src/test/java/com/label/integration/VideoCallbackIdempotencyTest.java new file mode 100644 index 0000000..37cfb24 --- /dev/null +++ b/src/test/java/com/label/integration/VideoCallbackIdempotencyTest.java @@ -0,0 +1,182 @@ +package com.label.integration; + +import com.label.AbstractIntegrationTest; +import com.label.common.redis.RedisKeyManager; +import com.label.common.redis.RedisService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.http.*; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * 视频处理回调幂等与重试集成测试(US8)。 + * + * 测试场景: + * 1. 同一 jobId 收到两次 SUCCESS 回调:annotation_task(EXTRACTION)仅创建一次 + * 2. 超出最大重试次数 → job.status = FAILED,source_data.status = PENDING + */ +public class VideoCallbackIdempotencyTest extends AbstractIntegrationTest { + + private static final String ADMIN_TOKEN = "test-admin-token-video"; + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private RedisService redisService; + + private Long companyId; + private Long adminUserId; + private Long sourceId; + private Long jobId; + + @BeforeEach + void setupTokenAndData() { + companyId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_company WHERE company_code = 'DEMO'", Long.class); + adminUserId = jdbcTemplate.queryForObject( + "SELECT id FROM sys_user WHERE username = 'admin'", Long.class); + + // 伪造 Redis Token + redisService.hSetAll(RedisKeyManager.tokenKey(ADMIN_TOKEN), + Map.of("userId", adminUserId.toString(), "role", "ADMIN", + "companyId", companyId.toString(), "username", "admin"), + 3600L); + + // 插入 source_data(PREPROCESSING 状态,模拟视频处理中) + jdbcTemplate.execute( + "INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " + + "file_name, file_size, bucket_name, status) " + + "VALUES (" + companyId + ", " + adminUserId + ", 'VIDEO', " + + "'videos/test.mp4', 'test.mp4', 10240, 'label-source-data', 'PREPROCESSING')"); + sourceId = jdbcTemplate.queryForObject( + "SELECT id FROM source_data ORDER BY id DESC LIMIT 1", Long.class); + + // 插入 PENDING 视频处理任务 + jdbcTemplate.execute( + "INSERT INTO video_process_job (company_id, source_id, job_type, status, " + + "params, retry_count, max_retries) " + + "VALUES (" + companyId + ", " + sourceId + ", 'FRAME_EXTRACT', 'PENDING', " + + "'{}'::jsonb, 0, 3)"); + jobId = jdbcTemplate.queryForObject( + "SELECT id FROM video_process_job ORDER BY id DESC LIMIT 1", Long.class); + } + + @AfterEach + void cleanupTokens() { + redisService.delete(RedisKeyManager.tokenKey(ADMIN_TOKEN)); + } + + // ------------------------------------------------------------------ 测试 1: 幂等性 -- + + @Test + @DisplayName("同一 jobId 发送两次 SUCCESS 回调:source_data 仅更新一次,status=PENDING") + void successCallback_idempotent_sourceUpdatedOnce() { + // 第一次 SUCCESS 回调 + ResponseEntity resp1 = sendCallback(jobId, "SUCCESS", "processed/frames.zip", null); + assertThat(resp1.getStatusCode()).isEqualTo(HttpStatus.OK); + + // 验证第一次回调后状态 + String jobStatus1 = jdbcTemplate.queryForObject( + "SELECT status FROM video_process_job WHERE id = ?", String.class, jobId); + assertThat(jobStatus1).isEqualTo("SUCCESS"); + + String sourceStatus1 = jdbcTemplate.queryForObject( + "SELECT status FROM source_data WHERE id = ?", String.class, sourceId); + assertThat(sourceStatus1).isEqualTo("PENDING"); + + // 第二次 SUCCESS 回调(幂等:应直接返回,不重复处理) + ResponseEntity resp2 = sendCallback(jobId, "SUCCESS", "processed/frames.zip", null); + assertThat(resp2.getStatusCode()).isEqualTo(HttpStatus.OK); + + // 状态仍为 SUCCESS + PENDING,未被改变 + String jobStatus2 = jdbcTemplate.queryForObject( + "SELECT status FROM video_process_job WHERE id = ?", String.class, jobId); + assertThat(jobStatus2).as("幂等:第二次回调不应改变 job 状态").isEqualTo("SUCCESS"); + + String sourceStatus2 = jdbcTemplate.queryForObject( + "SELECT status FROM source_data WHERE id = ?", String.class, sourceId); + assertThat(sourceStatus2).as("幂等:第二次回调不应改变 source_data 状态").isEqualTo("PENDING"); + } + + // ------------------------------------------------------------------ 测试 2: 超出重试上限 → FAILED -- + + @Test + @DisplayName("超出最大重试次数后 → job.status=FAILED,source_data.status=PENDING") + void failedCallback_exceedsMaxRetries_jobBecomesFailedAndSourceReverts() { + // 将 retry_count 设为 max_retries-1(再失败一次就超限) + jdbcTemplate.execute( + "UPDATE video_process_job SET retry_count = 2, max_retries = 3, " + + "status = 'RETRYING' WHERE id = " + jobId); + + // 发送最后一次 FAILED 回调(retry_count 变为 3 = max_retries → 超限) + ResponseEntity resp = sendCallback(jobId, "FAILED", null, "ffmpeg 处理超时"); + assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); + + // 验证 job → FAILED + Map jobRow = jdbcTemplate.queryForMap( + "SELECT status, retry_count, error_message FROM video_process_job WHERE id = ?", jobId); + assertThat(jobRow.get("status")).as("超出重试上限后 job 应为 FAILED").isEqualTo("FAILED"); + assertThat(((Number) jobRow.get("retry_count")).intValue()).isEqualTo(3); + assertThat(jobRow.get("error_message")).isEqualTo("ffmpeg 处理超时"); + + // 验证 source_data → PENDING(管理员可重新处理) + String sourceStatus = jdbcTemplate.queryForObject( + "SELECT status FROM source_data WHERE id = ?", String.class, sourceId); + assertThat(sourceStatus).as("超出重试上限后 source_data 应回退为 PENDING").isEqualTo("PENDING"); + } + + // ------------------------------------------------------------------ 测试 3: 管理员重置 -- + + @Test + @DisplayName("管理员重置 FAILED 任务 → job.status=PENDING,retryCount=0") + void resetFailedJob_succeeds() { + // 先将任务置为 FAILED 状态 + jdbcTemplate.execute( + "UPDATE video_process_job SET status = 'FAILED', retry_count = 3 WHERE id = " + jobId); + + // 重置 + HttpHeaders headers = new HttpHeaders(); + headers.set("Authorization", "Bearer " + ADMIN_TOKEN); + ResponseEntity resp = restTemplate.exchange( + baseUrl("/api/video/jobs/" + jobId + "/reset"), + HttpMethod.POST, + new HttpEntity<>(headers), + Map.class); + assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); + + // 验证 + Map jobRow = jdbcTemplate.queryForMap( + "SELECT status, retry_count FROM video_process_job WHERE id = ?", jobId); + assertThat(jobRow.get("status")).isEqualTo("PENDING"); + assertThat(((Number) jobRow.get("retry_count")).intValue()).isEqualTo(0); + } + + // ------------------------------------------------------------------ 工具方法 -- + + private ResponseEntity sendCallback(Long jobId, String status, + String outputPath, String errorMessage) { + Map body; + if ("SUCCESS".equals(status)) { + body = Map.of("jobId", jobId, "status", status, "outputPath", outputPath); + } else { + body = Map.of("jobId", jobId, "status", status, "errorMessage", + errorMessage != null ? errorMessage : ""); + } + + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + return restTemplate.exchange( + baseUrl("/api/video/callback"), + HttpMethod.POST, + new HttpEntity<>(body, headers), + Map.class); + } +}