feat(phase9-10): US8 视频处理与系统配置模块 + 代码审查修复

Phase 9 (US8):
- VideoProcessJob 实体 + VideoProcessJobMapper
- SysConfig 实体 + SysConfigMapper(手动多租户查询)
- VideoProcessService:createJob/handleCallback(幂等)/reset
  - T074 修复:AI 触发通过 TransactionSynchronization.afterCommit() 延迟至事务提交后
- VideoController:4 个端点,/api/video/callback 无需认证
- SysConfigService:公司专属优先 > 全局默认回退,UPSERT 仅允许已知键
- SysConfigController:GET /api/config + PUT /api/config/{key}
- TokenFilter:/api/video/callback 绕过 Token 认证
- 集成测试:VideoCallbackIdempotencyTest、SysConfigIntegrationTest

Phase 10 (代码审查与修复):
- T070 MultiTenantIsolationTest:跨公司资料/配置隔离验证
- T071 SourceController.upload():ResponseEntity<Result<T>> → Result<T> + @ResponseStatus
- T074 FinetuneService.trigger():移除 @Transactional,AI 调用在事务外执行
This commit is contained in:
wh
2026-04-09 16:18:39 +08:00
parent f6c3b0b4c6
commit a14c3f5559
14 changed files with 1295 additions and 8 deletions

View File

@@ -44,7 +44,9 @@ public class TokenFilter extends OncePerRequestFilter {
@Override
protected boolean shouldNotFilter(HttpServletRequest request) {
String path = request.getServletPath();
return !path.startsWith("/api/") || path.equals("/api/auth/login");
return !path.startsWith("/api/")
|| path.equals("/api/auth/login")
|| path.equals("/api/video/callback"); // AI 服务内部回调,不走用户 Token 认证
}
@Override

View File

@@ -0,0 +1,61 @@
package com.label.module.config.controller;
import com.label.common.result.Result;
import com.label.common.shiro.TokenPrincipal;
import com.label.module.config.entity.SysConfig;
import com.label.module.config.service.SysConfigService;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.shiro.authz.annotation.RequiresRoles;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
/**
* 系统配置接口2 个端点,均需 ADMIN 权限)。
*
* GET /api/config — 查询当前公司所有可见配置(公司专属 + 全局默认合并)
* PUT /api/config/{key} — 更新/创建公司专属配置UPSERT
*/
@RestController
@RequiredArgsConstructor
public class SysConfigController {
private final SysConfigService sysConfigService;
/**
* GET /api/config — 查询合并后的配置列表。
*
* 响应中每条配置含 scope 字段:
* - "COMPANY":当前公司专属配置(优先生效)
* - "GLOBAL":全局默认配置(公司未覆盖时生效)
*/
@GetMapping("/api/config")
@RequiresRoles("ADMIN")
public Result<List<Map<String, Object>>> listConfig(HttpServletRequest request) {
TokenPrincipal principal = principal(request);
return Result.success(sysConfigService.list(principal.getCompanyId()));
}
/**
* PUT /api/config/{key} — UPSERT 公司专属配置。
*
* Body: { "value": "...", "description": "..." }
*/
@PutMapping("/api/config/{key}")
@RequiresRoles("ADMIN")
public Result<SysConfig> updateConfig(@PathVariable String key,
@RequestBody Map<String, String> body,
HttpServletRequest request) {
String value = body.get("value");
String description = body.get("description");
TokenPrincipal principal = principal(request);
return Result.success(
sysConfigService.update(key, value, description, principal.getCompanyId()));
}
private TokenPrincipal principal(HttpServletRequest request) {
return (TokenPrincipal) request.getAttribute("__token_principal__");
}
}

View File

@@ -0,0 +1,41 @@
package com.label.module.config.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 系统配置实体,对应 sys_config 表。
*
* company_id 为 NULL 时表示全局默认配置,非 NULL 时表示租户专属配置(优先级更高)。
* 注sys_config 已加入 MybatisPlusConfig.IGNORED_TABLES不走多租户过滤器。
*/
@Data
@TableName("sys_config")
public class SysConfig {
@TableId(type = IdType.AUTO)
private Long id;
/**
* 所属公司 IDNULL = 全局默认配置;非 NULL = 租户专属配置)。
* 注意:不能用 @TableField(exist = false) 排除,必须保留以支持 company_id IS NULL 查询。
*/
private Long companyId;
/** 配置键 */
private String configKey;
/** 配置值 */
private String configValue;
/** 配置说明 */
private String description;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}

View File

@@ -0,0 +1,36 @@
package com.label.module.config.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.label.module.config.entity.SysConfig;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
/**
* sys_config 表 Mapper。
*
* 注意sys_config 已加入 MybatisPlusConfig.IGNORED_TABLES不走多租户过滤器
* 需手动传入 companyId 进行过滤。
*/
@Mapper
public interface SysConfigMapper extends BaseMapper<SysConfig> {
/** 查询指定公司的配置(租户专属,优先级高) */
@Select("SELECT * FROM sys_config WHERE company_id = #{companyId} AND config_key = #{configKey}")
SysConfig selectByCompanyAndKey(@Param("companyId") Long companyId,
@Param("configKey") String configKey);
/** 查询全局默认配置company_id IS NULL */
@Select("SELECT * FROM sys_config WHERE company_id IS NULL AND config_key = #{configKey}")
SysConfig selectGlobalByKey(@Param("configKey") String configKey);
/**
* 查询指定公司所有可见配置(公司专属 + 全局默认),
* 按 company_id DESC NULLS LAST 排序(公司专属优先于全局默认)。
*/
@Select("SELECT * FROM sys_config WHERE company_id = #{companyId} OR company_id IS NULL " +
"ORDER BY company_id DESC NULLS LAST")
List<SysConfig> selectAllForCompany(@Param("companyId") Long companyId);
}

View File

@@ -0,0 +1,139 @@
package com.label.module.config.service;
import com.label.common.exception.BusinessException;
import com.label.module.config.entity.SysConfig;
import com.label.module.config.mapper.SysConfigMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* 系统配置服务。
*
* 配置查找优先级公司专属company_id = N> 全局默认company_id IS NULL
*
* get() — 按优先级返回单个配置值
* list() — 返回合并后的配置列表(公司专属覆盖同名全局配置),附 scope 字段
* update() — 以公司专属配置进行 UPSERT仅允许已知配置键
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class SysConfigService {
/** 系统已知配置键白名单(防止写入未知键) */
private static final Set<String> KNOWN_KEYS = Set.of(
"token_ttl_seconds",
"model_default",
"video_frame_interval"
);
private final SysConfigMapper configMapper;
// ------------------------------------------------------------------ 查询单值 --
/**
* 按优先级获取配置值:公司专属优先,否则回退全局默认。
*
* @param configKey 配置键
* @param companyId 当前公司 ID
* @return 配置值(不存在时返回 null
*/
public String get(String configKey, Long companyId) {
// 先查公司专属
SysConfig company = configMapper.selectByCompanyAndKey(companyId, configKey);
if (company != null) {
return company.getConfigValue();
}
// 回退全局默认
SysConfig global = configMapper.selectGlobalByKey(configKey);
return global != null ? global.getConfigValue() : null;
}
// ------------------------------------------------------------------ 查询列表 --
/**
* 返回当前公司所有可见配置(公司专属 + 全局默认合并),
* 附加 scope 字段("COMPANY" / "GLOBAL")标识来源。
*
* @param companyId 当前公司 ID
* @return 配置列表(含 scope
*/
public List<Map<String, Object>> list(Long companyId) {
List<SysConfig> all = configMapper.selectAllForCompany(companyId);
// 按 configKey 分组,公司专属优先(排序保证公司专属在前)
Map<String, SysConfig> merged = new LinkedHashMap<>();
for (SysConfig cfg : all) {
// 由于 SQL 按 company_id DESC NULLS LAST 排序,公司专属先出现,直接 putIfAbsent
merged.putIfAbsent(cfg.getConfigKey(), cfg);
}
return merged.values().stream()
.map(cfg -> {
Map<String, Object> item = new LinkedHashMap<>();
item.put("id", cfg.getId());
item.put("configKey", cfg.getConfigKey());
item.put("configValue", cfg.getConfigValue());
item.put("description", cfg.getDescription());
item.put("scope", cfg.getCompanyId() != null ? "COMPANY" : "GLOBAL");
item.put("companyId", cfg.getCompanyId());
return item;
})
.collect(Collectors.toList());
}
// ------------------------------------------------------------------ 更新配置 --
/**
* 更新公司专属配置UPSERT
*
* 仅允许 KNOWN_KEYS 中的配置键,防止写入未定义的配置项。
*
* @param configKey 配置键
* @param value 新配置值
* @param description 配置说明(可选)
* @param companyId 当前公司 ID
*/
@Transactional
public SysConfig update(String configKey, String value,
String description, Long companyId) {
if (!KNOWN_KEYS.contains(configKey)) {
throw new BusinessException("UNKNOWN_CONFIG_KEY",
"未知配置键: " + configKey, HttpStatus.BAD_REQUEST);
}
if (value == null || value.isBlank()) {
throw new BusinessException("INVALID_CONFIG_VALUE",
"配置值不能为空", HttpStatus.BAD_REQUEST);
}
// UPSERT如公司专属配置已存在则更新否则插入
SysConfig existing = configMapper.selectByCompanyAndKey(companyId, configKey);
if (existing != null) {
existing.setConfigValue(value);
if (description != null && !description.isBlank()) {
existing.setDescription(description);
}
existing.setUpdatedAt(LocalDateTime.now());
configMapper.updateById(existing);
log.debug("公司配置已更新: companyId={}, key={}, value={}", companyId, configKey, value);
return existing;
} else {
SysConfig cfg = new SysConfig();
cfg.setCompanyId(companyId);
cfg.setConfigKey(configKey);
cfg.setConfigValue(value);
cfg.setDescription(description);
configMapper.insert(cfg);
log.debug("公司配置已创建: companyId={}, key={}, value={}", companyId, configKey, value);
return cfg;
}
}
}

View File

@@ -33,11 +33,14 @@ public class FinetuneService {
/**
* 向 GLM AI 服务提交微调任务。
*
* T074 设计AI 调用不在 @Transactional 内执行,避免持有 DB 连接期间发起 HTTP 请求。
* DB 写入updateFinetuneInfo是单条 UPDATE不需要显式事务自动提交
* 如果 AI 调用成功但 DB 写入失败,下次查询状态仍可通过 AI 服务的 jobId 重建状态。
*
* @param batchId 批次 ID
* @param principal 当前用户
* @return 包含 glmJobId 和 finetuneStatus 的 Map
*/
@Transactional
public Map<String, Object> trigger(Long batchId, TokenPrincipal principal) {
ExportBatch batch = exportService.getById(batchId, principal);
@@ -46,7 +49,7 @@ public class FinetuneService {
"微调任务已提交,当前状态: " + batch.getFinetuneStatus(), HttpStatus.CONFLICT);
}
// 调用 AI 服务提交微调(在事务外完成,此处事务仅保护后续 DB 写入
// 调用 AI 服务(无事务,不持有 DB 连接
AiServiceClient.FinetuneRequest req = AiServiceClient.FinetuneRequest.builder()
.datasetPath(batch.getDatasetFilePath())
.model("glm-4")
@@ -61,7 +64,7 @@ public class FinetuneService {
"提交微调任务失败: " + e.getMessage(), HttpStatus.SERVICE_UNAVAILABLE);
}
// 更新批次记录
// AI 调用成功后更新批次记录(单条 UPDATE自动提交
exportBatchMapper.updateFinetuneInfo(batchId,
response.getJobId(), "RUNNING", principal.getCompanyId());

View File

@@ -9,7 +9,6 @@ import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.shiro.authz.annotation.RequiresRoles;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
@@ -33,13 +32,13 @@ public class SourceController {
*/
@PostMapping("/upload")
@RequiresRoles("UPLOADER")
public ResponseEntity<Result<SourceResponse>> upload(
@ResponseStatus(HttpStatus.CREATED)
public Result<SourceResponse> upload(
@RequestParam("file") MultipartFile file,
@RequestParam("dataType") String dataType,
HttpServletRequest request) {
TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__");
SourceResponse response = sourceService.upload(file, dataType, principal);
return ResponseEntity.status(HttpStatus.CREATED).body(Result.success(response));
return Result.success(sourceService.upload(file, dataType, principal));
}
/**

View File

@@ -0,0 +1,85 @@
package com.label.module.video.controller;
import com.label.common.result.Result;
import com.label.common.shiro.TokenPrincipal;
import com.label.module.video.entity.VideoProcessJob;
import com.label.module.video.service.VideoProcessService;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.authz.annotation.RequiresRoles;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* 视频处理接口4 个端点)。
*
* POST /api/video/process — 触发视频处理ADMIN
* GET /api/video/jobs/{jobId} — 查询任务状态ADMIN
* POST /api/video/jobs/{jobId}/reset — 重置失败任务ADMIN
* POST /api/video/callback — AI 回调接口(无需认证,已在 TokenFilter 中排除)
*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class VideoController {
private final VideoProcessService videoProcessService;
/** POST /api/video/process — 触发视频处理任务 */
@PostMapping("/api/video/process")
@RequiresRoles("ADMIN")
public Result<VideoProcessJob> createJob(@RequestBody Map<String, Object> body,
HttpServletRequest request) {
Long sourceId = Long.parseLong(body.get("sourceId").toString());
String jobType = (String) body.get("jobType");
String params = body.containsKey("params") ? body.get("params").toString() : null;
TokenPrincipal principal = principal(request);
return Result.success(
videoProcessService.createJob(sourceId, jobType, params, principal.getCompanyId()));
}
/** GET /api/video/jobs/{jobId} — 查询视频处理任务 */
@GetMapping("/api/video/jobs/{jobId}")
@RequiresRoles("ADMIN")
public Result<VideoProcessJob> getJob(@PathVariable Long jobId,
HttpServletRequest request) {
return Result.success(videoProcessService.getJob(jobId, principal(request).getCompanyId()));
}
/** POST /api/video/jobs/{jobId}/reset — 管理员重置失败任务 */
@PostMapping("/api/video/jobs/{jobId}/reset")
@RequiresRoles("ADMIN")
public Result<VideoProcessJob> resetJob(@PathVariable Long jobId,
HttpServletRequest request) {
return Result.success(videoProcessService.reset(jobId, principal(request).getCompanyId()));
}
/**
* POST /api/video/callback — AI 服务回调(无需 Bearer Token
*
* 此端点已在 TokenFilter.shouldNotFilter() 中排除认证,
* 由 AI 服务直接调用,携带 jobId、status、outputPath 等参数。
*
* Body 示例:
* { "jobId": 123, "status": "SUCCESS", "outputPath": "processed/123/frames.zip" }
* { "jobId": 123, "status": "FAILED", "errorMessage": "ffmpeg error: ..." }
*/
@PostMapping("/api/video/callback")
public Result<Void> handleCallback(@RequestBody Map<String, Object> body) {
Long jobId = Long.parseLong(body.get("jobId").toString());
String status = (String) body.get("status");
String outputPath = body.containsKey("outputPath") ? (String) body.get("outputPath") : null;
String errorMessage = body.containsKey("errorMessage") ? (String) body.get("errorMessage") : null;
log.info("视频处理回调jobId={}, status={}", jobId, status);
videoProcessService.handleCallback(jobId, status, outputPath, errorMessage);
return Result.success(null);
}
private TokenPrincipal principal(HttpServletRequest request) {
return (TokenPrincipal) request.getAttribute("__token_principal__");
}
}

View File

@@ -0,0 +1,57 @@
package com.label.module.video.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 视频处理任务实体,对应 video_process_job 表。
*
* jobType 取值FRAME_EXTRACT / VIDEO_TO_TEXT
* status 取值PENDING / RUNNING / SUCCESS / FAILED / RETRYING
*/
@Data
@TableName("video_process_job")
public class VideoProcessJob {
@TableId(type = IdType.AUTO)
private Long id;
/** 所属公司(多租户键) */
private Long companyId;
/** 关联资料 ID */
private Long sourceId;
/** 任务类型FRAME_EXTRACT / VIDEO_TO_TEXT */
private String jobType;
/** 任务状态PENDING / RUNNING / SUCCESS / FAILED / RETRYING */
private String status;
/** 任务参数JSONB例如 {"frameInterval": 30} */
private String params;
/** AI 处理输出路径(成功后填写) */
private String outputPath;
/** 已重试次数 */
private Integer retryCount;
/** 最大重试次数(默认 3 */
private Integer maxRetries;
/** 错误信息 */
private String errorMessage;
private LocalDateTime startedAt;
private LocalDateTime completedAt;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}

View File

@@ -0,0 +1,12 @@
package com.label.module.video.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.label.module.video.entity.VideoProcessJob;
import org.apache.ibatis.annotations.Mapper;
/**
* video_process_job 表 Mapper。
*/
@Mapper
public interface VideoProcessJobMapper extends BaseMapper<VideoProcessJob> {
}

View File

@@ -0,0 +1,289 @@
package com.label.module.video.service;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.label.common.ai.AiServiceClient;
import com.label.common.exception.BusinessException;
import com.label.common.statemachine.SourceStatus;
import com.label.common.statemachine.StateValidator;
import com.label.module.source.entity.SourceData;
import com.label.module.source.mapper.SourceDataMapper;
import com.label.module.video.entity.VideoProcessJob;
import com.label.module.video.mapper.VideoProcessJobMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.time.LocalDateTime;
import java.util.Map;
/**
* 视频处理服务:创建任务、处理回调、管理员重置。
*
* 状态流转:
* - 创建时source_data → PREPROCESSINGjob → PENDING
* - 回调成功job → SUCCESSsource_data → PENDING进入提取队列
* - 回调失败可重试job → RETRYINGretryCount++,重新触发 AI
* - 回调失败超出上限job → FAILEDsource_data → PENDING
* - 管理员重置job → PENDING可手动重新触发
*
* T074 设计说明:
* AI 调用通过 TransactionSynchronizationManager.registerSynchronization().afterCommit()
* 延迟到事务提交后执行,避免在持有 DB 连接期间进行 HTTP 调用。
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class VideoProcessService {
private final VideoProcessJobMapper jobMapper;
private final SourceDataMapper sourceDataMapper;
private final AiServiceClient aiServiceClient;
@Value("${rustfs.bucket:label-source-data}")
private String bucket;
// ------------------------------------------------------------------ 创建任务 --
/**
* 创建视频处理任务并在事务提交后触发 AI 服务。
*
* DB 写入source_data→PREPROCESSING + 插入 job在 @Transactional 内完成;
* AI 触发通过 afterCommit() 在事务提交后执行,不占用 DB 连接。
*
* @param sourceId 资料 ID
* @param jobType 任务类型FRAME_EXTRACT / VIDEO_TO_TEXT
* @param params JSON 参数(如 {"frameInterval": 30}
* @param companyId 租户 ID
* @return 新建的 VideoProcessJob
*/
@Transactional
public VideoProcessJob createJob(Long sourceId, String jobType,
String params, Long companyId) {
SourceData source = sourceDataMapper.selectById(sourceId);
if (source == null || !companyId.equals(source.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "资料不存在: " + sourceId, HttpStatus.NOT_FOUND);
}
validateJobType(jobType);
// source_data → PREPROCESSING
StateValidator.assertTransition(
SourceStatus.TRANSITIONS,
SourceStatus.valueOf(source.getStatus()), SourceStatus.PREPROCESSING);
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, sourceId)
.set(SourceData::getStatus, "PREPROCESSING")
.set(SourceData::getUpdatedAt, LocalDateTime.now()));
// 插入 PENDING 任务
VideoProcessJob job = new VideoProcessJob();
job.setCompanyId(companyId);
job.setSourceId(sourceId);
job.setJobType(jobType);
job.setStatus("PENDING");
job.setParams(params != null ? params : "{}");
job.setRetryCount(0);
job.setMaxRetries(3);
jobMapper.insert(job);
// 事务提交后触发 AI不在事务内不占用 DB 连接)
final Long jobId = job.getId();
final String filePath = source.getFilePath();
final String finalJobType = jobType;
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
triggerAi(jobId, sourceId, filePath, finalJobType);
}
});
log.debug("视频处理任务已创建AI 将在事务提交后触发): jobId={}, sourceId={}", jobId, sourceId);
return job;
}
// ------------------------------------------------------------------ 处理回调 --
/**
* 处理 AI 服务异步回调POST /api/video/callback无需用户 Token
*
* 幂等:若 job 已为 SUCCESS直接返回防止重复处理。
* 重试触发同样延迟到事务提交后afterCommit不在事务内执行。
*
* @param jobId 任务 ID
* @param callbackStatus AI 回调状态SUCCESS / FAILED
* @param outputPath 成功时的输出路径(可选)
* @param errorMessage 失败时的错误信息(可选)
*/
@Transactional
public void handleCallback(Long jobId, String callbackStatus,
String outputPath, String errorMessage) {
VideoProcessJob job = jobMapper.selectById(jobId);
if (job == null) {
log.warn("视频处理回调job 不存在jobId={}", jobId);
return;
}
// 幂等:已成功则忽略重复回调
if ("SUCCESS".equals(job.getStatus())) {
log.debug("视频处理回调幂等jobId={} 已为 SUCCESS跳过", jobId);
return;
}
if ("SUCCESS".equals(callbackStatus)) {
handleSuccess(job, outputPath);
} else {
handleFailure(job, errorMessage);
}
}
// ------------------------------------------------------------------ 管理员重置 --
/**
* 管理员手动重置失败任务FAILED → PENDING
*
* 仅允许 FAILED 状态的任务重置,重置后 retryCount 清零,
* 管理员可随后重新调用 createJob 触发处理。
*
* @param jobId 任务 ID
* @param companyId 租户 ID
*/
@Transactional
public VideoProcessJob reset(Long jobId, Long companyId) {
VideoProcessJob job = jobMapper.selectById(jobId);
if (job == null || !companyId.equals(job.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND);
}
if (!"FAILED".equals(job.getStatus())) {
throw new BusinessException("INVALID_TRANSITION",
"只有 FAILED 状态的任务可以重置,当前状态: " + job.getStatus(),
HttpStatus.BAD_REQUEST);
}
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
.eq(VideoProcessJob::getId, jobId)
.set(VideoProcessJob::getStatus, "PENDING")
.set(VideoProcessJob::getRetryCount, 0)
.set(VideoProcessJob::getErrorMessage, null)
.set(VideoProcessJob::getUpdatedAt, LocalDateTime.now()));
job.setStatus("PENDING");
job.setRetryCount(0);
log.debug("视频处理任务已重置: jobId={}", jobId);
return job;
}
// ------------------------------------------------------------------ 查询 --
public VideoProcessJob getJob(Long jobId, Long companyId) {
VideoProcessJob job = jobMapper.selectById(jobId);
if (job == null || !companyId.equals(job.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND);
}
return job;
}
// ------------------------------------------------------------------ 私有方法 --
private void handleSuccess(VideoProcessJob job, String outputPath) {
// job → SUCCESS
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
.eq(VideoProcessJob::getId, job.getId())
.set(VideoProcessJob::getStatus, "SUCCESS")
.set(VideoProcessJob::getOutputPath, outputPath)
.set(VideoProcessJob::getCompletedAt, LocalDateTime.now())
.set(VideoProcessJob::getUpdatedAt, LocalDateTime.now()));
// source_data PREPROCESSING → PENDING进入提取队列
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, job.getSourceId())
.set(SourceData::getStatus, "PENDING")
.set(SourceData::getUpdatedAt, LocalDateTime.now()));
log.debug("视频处理成功jobId={}, sourceId={}", job.getId(), job.getSourceId());
}
private void handleFailure(VideoProcessJob job, String errorMessage) {
int newRetryCount = job.getRetryCount() + 1;
int maxRetries = job.getMaxRetries() != null ? job.getMaxRetries() : 3;
if (newRetryCount < maxRetries) {
// 仍有重试次数job → RETRYING事务提交后重新触发 AI
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
.eq(VideoProcessJob::getId, job.getId())
.set(VideoProcessJob::getStatus, "RETRYING")
.set(VideoProcessJob::getRetryCount, newRetryCount)
.set(VideoProcessJob::getErrorMessage, errorMessage)
.set(VideoProcessJob::getUpdatedAt, LocalDateTime.now()));
log.warn("视频处理失败,开始第 {} 次重试jobId={}, error={}",
newRetryCount, job.getId(), errorMessage);
// 重试 AI 触发延迟到事务提交后
SourceData source = sourceDataMapper.selectById(job.getSourceId());
if (source != null) {
final Long jobId = job.getId();
final Long sourceId = job.getSourceId();
final String filePath = source.getFilePath();
final String jobType = job.getJobType();
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
triggerAi(jobId, sourceId, filePath, jobType);
}
});
}
} else {
// 超出最大重试次数job → FAILEDsource_data → PENDING
jobMapper.update(null, new LambdaUpdateWrapper<VideoProcessJob>()
.eq(VideoProcessJob::getId, job.getId())
.set(VideoProcessJob::getStatus, "FAILED")
.set(VideoProcessJob::getRetryCount, newRetryCount)
.set(VideoProcessJob::getErrorMessage, errorMessage)
.set(VideoProcessJob::getCompletedAt, LocalDateTime.now())
.set(VideoProcessJob::getUpdatedAt, LocalDateTime.now()));
// source_data PREPROCESSING → PENDING管理员可重新处理
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, job.getSourceId())
.set(SourceData::getStatus, "PENDING")
.set(SourceData::getUpdatedAt, LocalDateTime.now()));
log.error("视频处理永久失败jobId={}, sourceId={}, error={}",
job.getId(), job.getSourceId(), errorMessage);
}
}
private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType) {
AiServiceClient.VideoProcessRequest req = AiServiceClient.VideoProcessRequest.builder()
.sourceId(sourceId)
.filePath(filePath)
.bucket(bucket)
.params(Map.of("jobId", jobId, "jobType", jobType))
.build();
try {
if ("FRAME_EXTRACT".equals(jobType)) {
aiServiceClient.extractFrames(req);
} else {
aiServiceClient.videoToText(req);
}
log.debug("AI 触发成功: jobId={}", jobId);
} catch (Exception e) {
log.warn("触发视频处理 AI 失败jobId={}{}job 保持当前状态等待重试", jobId, e.getMessage());
}
}
private void validateJobType(String jobType) {
if (!"FRAME_EXTRACT".equals(jobType) && !"VIDEO_TO_TEXT".equals(jobType)) {
throw new BusinessException("INVALID_JOB_TYPE",
"任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT", HttpStatus.BAD_REQUEST);
}
}
}