package com.label.integration; import com.label.AbstractIntegrationTest; import com.label.service.RedisService; import com.label.util.RedisUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.http.*; import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; /** * 视频处理回调幂等与重试集成测试(US8)。 * * 测试场景: * 1. 同一 jobId 收到两次 SUCCESS 回调:annotation_task(EXTRACTION)仅创建一次 * 2. 超出最大重试次数 → job.status = FAILED,source_data.status = PENDING */ public class VideoCallbackIdempotencyTest extends AbstractIntegrationTest { private static final String ADMIN_TOKEN = "test-admin-token-video"; @Autowired private TestRestTemplate restTemplate; @Autowired private RedisService redisService; private Long companyId; private Long adminUserId; private Long sourceId; private Long jobId; @BeforeEach void setupTokenAndData() { companyId = jdbcTemplate.queryForObject( "SELECT id FROM sys_company WHERE company_code = 'DEMO'", Long.class); adminUserId = jdbcTemplate.queryForObject( "SELECT id FROM sys_user WHERE username = 'admin'", Long.class); // 伪造 Redis Token redisService.hSetAll(RedisUtil.tokenKey(ADMIN_TOKEN), Map.of("userId", adminUserId.toString(), "role", "ADMIN", "companyId", companyId.toString(), "username", "admin"), 3600L); // 插入 source_data(PREPROCESSING 状态,模拟视频处理中) jdbcTemplate.execute( "INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " + "file_name, file_size, bucket_name, status) " + "VALUES (" + companyId + ", " + adminUserId + ", 'VIDEO', " + "'videos/test.mp4', 'test.mp4', 10240, 'label-source-data', 'PREPROCESSING')"); sourceId = jdbcTemplate.queryForObject( "SELECT id FROM source_data ORDER BY id DESC LIMIT 1", Long.class); // 插入 PENDING 视频处理任务 jdbcTemplate.execute( "INSERT INTO video_process_job (company_id, source_id, job_type, status, " + "params, retry_count, max_retries) " + "VALUES (" + companyId + ", " + sourceId + ", 'FRAME_EXTRACT', 'PENDING', " + "'{}'::jsonb, 0, 3)"); jobId = jdbcTemplate.queryForObject( "SELECT id FROM video_process_job ORDER BY id DESC LIMIT 1", Long.class); } @AfterEach void cleanupTokens() { redisService.delete(RedisUtil.tokenKey(ADMIN_TOKEN)); } // ------------------------------------------------------------------ 测试 1: 幂等性 -- @Test @DisplayName("同一 jobId 发送两次 SUCCESS 回调:source_data 仅更新一次,status=PENDING") void successCallback_idempotent_sourceUpdatedOnce() { // 第一次 SUCCESS 回调 ResponseEntity resp1 = sendCallback(jobId, "SUCCESS", "processed/frames.zip", null); assertThat(resp1.getStatusCode()).isEqualTo(HttpStatus.OK); // 验证第一次回调后状态 String jobStatus1 = jdbcTemplate.queryForObject( "SELECT status FROM video_process_job WHERE id = ?", String.class, jobId); assertThat(jobStatus1).isEqualTo("SUCCESS"); String sourceStatus1 = jdbcTemplate.queryForObject( "SELECT status FROM source_data WHERE id = ?", String.class, sourceId); assertThat(sourceStatus1).isEqualTo("PENDING"); // 第二次 SUCCESS 回调(幂等:应直接返回,不重复处理) ResponseEntity resp2 = sendCallback(jobId, "SUCCESS", "processed/frames.zip", null); assertThat(resp2.getStatusCode()).isEqualTo(HttpStatus.OK); // 状态仍为 SUCCESS + PENDING,未被改变 String jobStatus2 = jdbcTemplate.queryForObject( "SELECT status FROM video_process_job WHERE id = ?", String.class, jobId); assertThat(jobStatus2).as("幂等:第二次回调不应改变 job 状态").isEqualTo("SUCCESS"); String sourceStatus2 = jdbcTemplate.queryForObject( "SELECT status FROM source_data WHERE id = ?", String.class, sourceId); assertThat(sourceStatus2).as("幂等:第二次回调不应改变 source_data 状态").isEqualTo("PENDING"); } // ------------------------------------------------------------------ 测试 2: 超出重试上限 → FAILED -- @Test @DisplayName("超出最大重试次数后 → job.status=FAILED,source_data.status=PENDING") void failedCallback_exceedsMaxRetries_jobBecomesFailedAndSourceReverts() { // 将 retry_count 设为 max_retries-1(再失败一次就超限) jdbcTemplate.execute( "UPDATE video_process_job SET retry_count = 2, max_retries = 3, " + "status = 'RETRYING' WHERE id = " + jobId); // 发送最后一次 FAILED 回调(retry_count 变为 3 = max_retries → 超限) ResponseEntity resp = sendCallback(jobId, "FAILED", null, "ffmpeg 处理超时"); assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); // 验证 job → FAILED Map jobRow = jdbcTemplate.queryForMap( "SELECT status, retry_count, error_message FROM video_process_job WHERE id = ?", jobId); assertThat(jobRow.get("status")).as("超出重试上限后 job 应为 FAILED").isEqualTo("FAILED"); assertThat(((Number) jobRow.get("retry_count")).intValue()).isEqualTo(3); assertThat(jobRow.get("error_message")).isEqualTo("ffmpeg 处理超时"); // 验证 source_data → PENDING(管理员可重新处理) String sourceStatus = jdbcTemplate.queryForObject( "SELECT status FROM source_data WHERE id = ?", String.class, sourceId); assertThat(sourceStatus).as("超出重试上限后 source_data 应回退为 PENDING").isEqualTo("PENDING"); } // ------------------------------------------------------------------ 测试 3: 管理员重置 -- @Test @DisplayName("管理员重置 FAILED 任务 → job.status=PENDING,retryCount=0") void resetFailedJob_succeeds() { // 先将任务置为 FAILED 状态 jdbcTemplate.execute( "UPDATE video_process_job SET status = 'FAILED', retry_count = 3 WHERE id = " + jobId); // 重置 HttpHeaders headers = new HttpHeaders(); headers.set("Authorization", "Bearer " + ADMIN_TOKEN); ResponseEntity resp = restTemplate.exchange( baseUrl("/api/video/jobs/" + jobId + "/reset"), HttpMethod.POST, new HttpEntity<>(headers), Map.class); assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK); // 验证 Map jobRow = jdbcTemplate.queryForMap( "SELECT status, retry_count FROM video_process_job WHERE id = ?", jobId); assertThat(jobRow.get("status")).isEqualTo("PENDING"); assertThat(((Number) jobRow.get("retry_count")).intValue()).isEqualTo(0); } // ------------------------------------------------------------------ 工具方法 -- private ResponseEntity sendCallback(Long jobId, String status, String outputPath, String errorMessage) { Map body; if ("SUCCESS".equals(status)) { body = Map.of("jobId", jobId, "status", status, "outputPath", outputPath); } else { body = Map.of("jobId", jobId, "status", status, "errorMessage", errorMessage != null ? errorMessage : ""); } HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); return restTemplate.exchange( baseUrl("/api/video/callback"), HttpMethod.POST, new HttpEntity<>(body, headers), Map.class); } }