package com.label.service; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.label.common.ai.AiServiceClient; import com.label.common.exception.BusinessException; import com.label.common.statemachine.SourceStatus; import com.label.common.statemachine.StateValidator; import com.label.entity.SourceData; import com.label.mapper.SourceDataMapper; import com.label.entity.VideoProcessJob; import com.label.mapper.VideoProcessJobMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import java.time.LocalDateTime; import java.util.Map; /** * 视频处理服务:创建任务、处理回调、管理员重置。 * * 状态流转: * - 创建时:source_data → PREPROCESSING,job → PENDING * - 回调成功:job → SUCCESS,source_data → PENDING(进入提取队列) * - 回调失败(可重试):job → RETRYING,retryCount++,重新触发 AI * - 回调失败(超出上限):job → FAILED,source_data → PENDING * - 管理员重置:job → PENDING(可手动重新触发) * * T074 设计说明: * AI 调用通过 TransactionSynchronizationManager.registerSynchronization().afterCommit() * 延迟到事务提交后执行,避免在持有 DB 连接期间进行 HTTP 调用。 */ @Slf4j @Service @RequiredArgsConstructor public class VideoProcessService { private final VideoProcessJobMapper jobMapper; private final SourceDataMapper sourceDataMapper; private final AiServiceClient aiServiceClient; @Value("${rustfs.bucket:label-source-data}") private String bucket; // ------------------------------------------------------------------ 创建任务 -- /** * 创建视频处理任务并在事务提交后触发 AI 服务。 * * DB 写入(source_data→PREPROCESSING + 插入 job)在 @Transactional 内完成; * AI 触发通过 afterCommit() 在事务提交后执行,不占用 DB 连接。 * * @param sourceId 资料 ID * @param jobType 任务类型(FRAME_EXTRACT / VIDEO_TO_TEXT) * @param params JSON 参数(如 {"frameInterval": 30}) * @param companyId 租户 ID * @return 新建的 VideoProcessJob */ @Transactional public VideoProcessJob createJob(Long sourceId, String jobType, String params, Long companyId) { SourceData source = sourceDataMapper.selectById(sourceId); if (source == null || !companyId.equals(source.getCompanyId())) { throw new BusinessException("NOT_FOUND", "资料不存在: " + sourceId, HttpStatus.NOT_FOUND); } validateJobType(jobType); // source_data → PREPROCESSING StateValidator.assertTransition( SourceStatus.TRANSITIONS, SourceStatus.valueOf(source.getStatus()), SourceStatus.PREPROCESSING); sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, sourceId) .set(SourceData::getStatus, "PREPROCESSING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); // 插入 PENDING 任务 VideoProcessJob job = new VideoProcessJob(); job.setCompanyId(companyId); job.setSourceId(sourceId); job.setJobType(jobType); job.setStatus("PENDING"); job.setParams(params != null ? params : "{}"); job.setRetryCount(0); job.setMaxRetries(3); jobMapper.insert(job); // 事务提交后触发 AI(不在事务内,不占用 DB 连接) final Long jobId = job.getId(); final String filePath = source.getFilePath(); final String finalJobType = jobType; TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { triggerAi(jobId, sourceId, filePath, finalJobType); } }); log.info("视频处理任务已创建(AI 将在事务提交后触发): jobId={}, sourceId={}", jobId, sourceId); return job; } // ------------------------------------------------------------------ 处理回调 -- /** * 处理 AI 服务异步回调(POST /api/video/callback,无需用户 Token)。 * * 幂等:若 job 已为 SUCCESS,直接返回,防止重复处理。 * 重试触发同样延迟到事务提交后(afterCommit),不在事务内执行。 * * @param jobId 任务 ID * @param callbackStatus AI 回调状态(SUCCESS / FAILED) * @param outputPath 成功时的输出路径(可选) * @param errorMessage 失败时的错误信息(可选) */ @Transactional public void handleCallback(Long jobId, String callbackStatus, String outputPath, String errorMessage) { // video_process_job 在 IGNORED_TABLES 中(回调无 CompanyContext),此处显式校验 VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || job.getCompanyId() == null) { log.warn("视频处理回调:job 不存在,jobId={}", jobId); return; } // 幂等:已成功则忽略重复回调 if ("SUCCESS".equals(job.getStatus())) { log.info("视频处理回调幂等:jobId={} 已为 SUCCESS,跳过", jobId); return; } if ("SUCCESS".equals(callbackStatus)) { handleSuccess(job, outputPath); } else { handleFailure(job, errorMessage); } } // ------------------------------------------------------------------ 管理员重置 -- /** * 管理员手动重置失败任务(FAILED → PENDING)。 * * 仅允许 FAILED 状态的任务重置,重置后 retryCount 清零, * 管理员可随后重新调用 createJob 触发处理。 * * @param jobId 任务 ID * @param companyId 租户 ID */ @Transactional public VideoProcessJob reset(Long jobId, Long companyId) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || !companyId.equals(job.getCompanyId())) { throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND); } if (!"FAILED".equals(job.getStatus())) { throw new BusinessException("INVALID_TRANSITION", "只有 FAILED 状态的任务可以重置,当前状态: " + job.getStatus(), HttpStatus.BAD_REQUEST); } jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, jobId) .set(VideoProcessJob::getStatus, "PENDING") .set(VideoProcessJob::getRetryCount, 0) .set(VideoProcessJob::getErrorMessage, null) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); job.setStatus("PENDING"); job.setRetryCount(0); log.info("视频处理任务已重置: jobId={}", jobId); return job; } // ------------------------------------------------------------------ 查询 -- public VideoProcessJob getJob(Long jobId, Long companyId) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || !companyId.equals(job.getCompanyId())) { throw new BusinessException("NOT_FOUND", "视频处理任务不存在: " + jobId, HttpStatus.NOT_FOUND); } return job; } // ------------------------------------------------------------------ 私有方法 -- private void handleSuccess(VideoProcessJob job, String outputPath) { // job → SUCCESS jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "SUCCESS") .set(VideoProcessJob::getOutputPath, outputPath) .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); // source_data PREPROCESSING → PENDING(进入提取队列) sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, job.getSourceId()) .set(SourceData::getStatus, "PENDING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); log.info("视频处理成功:jobId={}, sourceId={}", job.getId(), job.getSourceId()); } private void handleFailure(VideoProcessJob job, String errorMessage) { int newRetryCount = job.getRetryCount() + 1; int maxRetries = job.getMaxRetries() != null ? job.getMaxRetries() : 3; if (newRetryCount < maxRetries) { // 仍有重试次数:job → RETRYING,事务提交后重新触发 AI jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "RETRYING") .set(VideoProcessJob::getRetryCount, newRetryCount) .set(VideoProcessJob::getErrorMessage, errorMessage) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); log.warn("视频处理失败,开始第 {} 次重试:jobId={}, error={}", newRetryCount, job.getId(), errorMessage); // 重试 AI 触发延迟到事务提交后 SourceData source = sourceDataMapper.selectById(job.getSourceId()); if (source != null) { final Long jobId = job.getId(); final Long sourceId = job.getSourceId(); final String filePath = source.getFilePath(); final String jobType = job.getJobType(); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { triggerAi(jobId, sourceId, filePath, jobType); } }); } } else { // 超出最大重试次数:job → FAILED,source_data → PENDING jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "FAILED") .set(VideoProcessJob::getRetryCount, newRetryCount) .set(VideoProcessJob::getErrorMessage, errorMessage) .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); // source_data PREPROCESSING → PENDING(管理员可重新处理) sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, job.getSourceId()) .set(SourceData::getStatus, "PENDING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); log.error("视频处理永久失败:jobId={}, sourceId={}, error={}", job.getId(), job.getSourceId(), errorMessage); } } private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType) { AiServiceClient.VideoProcessRequest req = AiServiceClient.VideoProcessRequest.builder() .sourceId(sourceId) .filePath(filePath) .bucket(bucket) .params(Map.of("jobId", jobId, "jobType", jobType)) .build(); try { if ("FRAME_EXTRACT".equals(jobType)) { aiServiceClient.extractFrames(req); } else { aiServiceClient.videoToText(req); } log.info("AI 触发成功: jobId={}", jobId); } catch (Exception e) { log.error("触发视频处理 AI 失败(jobId={}):{},job 保持当前状态,需管理员手动重置", jobId, e.getMessage()); } } private void validateJobType(String jobType) { if (!"FRAME_EXTRACT".equals(jobType) && !"VIDEO_TO_TEXT".equals(jobType)) { throw new BusinessException("INVALID_JOB_TYPE", "任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT", HttpStatus.BAD_REQUEST); } } }