package com.label.service; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.label.common.ai.AiServiceClient; import com.label.common.exception.BusinessException; import com.label.common.statemachine.StateValidator; import com.label.common.statemachine.VideoSourceStatus; import com.label.entity.SourceData; import com.label.entity.VideoProcessJob; import com.label.mapper.SourceDataMapper; import com.label.mapper.VideoProcessJobMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; import java.time.LocalDateTime; import java.util.Map; @Slf4j @Service @RequiredArgsConstructor public class VideoProcessService { private final VideoProcessJobMapper jobMapper; private final SourceDataMapper sourceDataMapper; private final AiServiceClient aiServiceClient; private final ObjectMapper objectMapper; @Transactional public VideoProcessJob createJob(Long sourceId, String jobType, String params, Long companyId) { SourceData source = sourceDataMapper.selectById(sourceId); if (source == null || !companyId.equals(source.getCompanyId())) { throw new BusinessException("NOT_FOUND", "资料不存在 " + sourceId, HttpStatus.NOT_FOUND); } validateJobType(jobType); StateValidator.assertTransition( VideoSourceStatus.TRANSITIONS, VideoSourceStatus.valueOf(source.getStatus()), VideoSourceStatus.PREPROCESSING ); sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, sourceId) .set(SourceData::getStatus, "PREPROCESSING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); VideoProcessJob job = new VideoProcessJob(); job.setCompanyId(companyId); job.setSourceId(sourceId); job.setJobType(jobType); job.setStatus("PENDING"); job.setParams(params != null ? params : "{}"); job.setRetryCount(0); job.setMaxRetries(3); jobMapper.insert(job); final Long jobId = job.getId(); final String filePath = source.getFilePath(); final String finalJobType = jobType; final String finalParams = job.getParams(); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { triggerAi(jobId, sourceId, filePath, finalJobType, finalParams); } }); log.info("视频处理任务已创建: jobId={}, sourceId={}", jobId, sourceId); return job; } @Transactional public void handleCallback(Long jobId, String callbackStatus, String outputPath, String errorMessage) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || job.getCompanyId() == null) { log.warn("视频处理回调时 job 不存在: jobId={}", jobId); return; } if ("SUCCESS".equals(job.getStatus())) { log.info("视频处理回调幂等跳过: jobId={}", jobId); return; } if ("SUCCESS".equals(callbackStatus)) { handleSuccess(job, outputPath); } else { handleFailure(job, errorMessage); } } @Transactional public VideoProcessJob reset(Long jobId, Long companyId) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || !companyId.equals(job.getCompanyId())) { throw new BusinessException("NOT_FOUND", "视频处理任务不存在 " + jobId, HttpStatus.NOT_FOUND); } if (!"FAILED".equals(job.getStatus())) { throw new BusinessException( "INVALID_TRANSITION", "只有 FAILED 状态的任务可以重置,当前状态 " + job.getStatus(), HttpStatus.BAD_REQUEST ); } jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, jobId) .set(VideoProcessJob::getStatus, "PENDING") .set(VideoProcessJob::getRetryCount, 0) .set(VideoProcessJob::getErrorMessage, null) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); job.setStatus("PENDING"); job.setRetryCount(0); return job; } public VideoProcessJob getJob(Long jobId, Long companyId) { VideoProcessJob job = jobMapper.selectById(jobId); if (job == null || !companyId.equals(job.getCompanyId())) { throw new BusinessException("NOT_FOUND", "视频处理任务不存在 " + jobId, HttpStatus.NOT_FOUND); } return job; } private void handleSuccess(VideoProcessJob job, String outputPath) { jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "SUCCESS") .set(VideoProcessJob::getOutputPath, outputPath) .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, job.getSourceId()) .set(SourceData::getStatus, "PENDING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); } private void handleFailure(VideoProcessJob job, String errorMessage) { int newRetryCount = job.getRetryCount() + 1; int maxRetries = job.getMaxRetries() != null ? job.getMaxRetries() : 3; if (newRetryCount < maxRetries) { jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "RETRYING") .set(VideoProcessJob::getRetryCount, newRetryCount) .set(VideoProcessJob::getErrorMessage, errorMessage) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); SourceData source = sourceDataMapper.selectById(job.getSourceId()); if (source != null) { final Long jobId = job.getId(); final Long sourceId = job.getSourceId(); final String filePath = source.getFilePath(); final String jobType = job.getJobType(); final String params = job.getParams(); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { triggerAi(jobId, sourceId, filePath, jobType, params); } }); } } else { jobMapper.update(null, new LambdaUpdateWrapper() .eq(VideoProcessJob::getId, job.getId()) .set(VideoProcessJob::getStatus, "FAILED") .set(VideoProcessJob::getRetryCount, newRetryCount) .set(VideoProcessJob::getErrorMessage, errorMessage) .set(VideoProcessJob::getCompletedAt, LocalDateTime.now()) .set(VideoProcessJob::getUpdatedAt, LocalDateTime.now())); sourceDataMapper.update(null, new LambdaUpdateWrapper() .eq(SourceData::getId, job.getSourceId()) .set(SourceData::getStatus, "PENDING") .set(SourceData::getUpdatedAt, LocalDateTime.now())); } } private void triggerAi(Long jobId, Long sourceId, String filePath, String jobType, String paramsJson) { Map params = parseParams(paramsJson); try { if ("FRAME_EXTRACT".equals(jobType)) { aiServiceClient.extractFrames(AiServiceClient.ExtractFramesRequest.builder() .filePath(filePath) .sourceId(sourceId) .jobId(jobId) .mode(stringParam(params, "mode", "interval")) .frameInterval(intParam(params, "frameInterval", 30)) .build()); } else { aiServiceClient.videoToText(AiServiceClient.VideoToTextRequest.builder() .filePath(filePath) .sourceId(sourceId) .jobId(jobId) .startSec(doubleParam(params, "startSec", 0.0)) .endSec(doubleParam(params, "endSec", 120.0)) .model(stringParam(params, "model", null)) .promptTemplate(stringParam(params, "promptTemplate", null)) .build()); } log.info("AI 视频任务已触发: jobId={}", jobId); } catch (Exception e) { log.error("触发视频处理 AI 失败(jobId={}): {}", jobId, e.getMessage()); } } private Map parseParams(String paramsJson) { if (paramsJson == null || paramsJson.isBlank()) { return Map.of(); } try { return objectMapper.readValue(paramsJson, new TypeReference<>() {}); } catch (Exception e) { log.warn("解析视频处理参数失败,将使用默认值: {}", e.getMessage()); return Map.of(); } } private String stringParam(Map params, String key, String defaultValue) { Object value = params.get(key); return value == null ? defaultValue : String.valueOf(value); } private Integer intParam(Map params, String key, Integer defaultValue) { Object value = params.get(key); if (value instanceof Number number) { return number.intValue(); } if (value instanceof String text && !text.isBlank()) { return Integer.parseInt(text); } return defaultValue; } private Double doubleParam(Map params, String key, Double defaultValue) { Object value = params.get(key); if (value instanceof Number number) { return number.doubleValue(); } if (value instanceof String text && !text.isBlank()) { return Double.parseDouble(text); } return defaultValue; } private void validateJobType(String jobType) { if (!"FRAME_EXTRACT".equals(jobType) && !"VIDEO_TO_TEXT".equals(jobType)) { throw new BusinessException( "INVALID_JOB_TYPE", "任务类型不合法,应为 FRAME_EXTRACT 或 VIDEO_TO_TEXT", HttpStatus.BAD_REQUEST ); } } }