184 lines
8.2 KiB
Java
184 lines
8.2 KiB
Java
package com.label.integration;
|
||
|
||
import com.label.AbstractIntegrationTest;
|
||
import com.label.service.RedisService;
|
||
import com.label.util.RedisUtil;
|
||
|
||
import org.junit.jupiter.api.AfterEach;
|
||
import org.junit.jupiter.api.BeforeEach;
|
||
import org.junit.jupiter.api.DisplayName;
|
||
import org.junit.jupiter.api.Test;
|
||
import org.springframework.beans.factory.annotation.Autowired;
|
||
import org.springframework.boot.test.web.client.TestRestTemplate;
|
||
import org.springframework.http.*;
|
||
|
||
import java.util.Map;
|
||
|
||
import static org.assertj.core.api.Assertions.assertThat;
|
||
|
||
/**
|
||
* 视频处理回调幂等与重试集成测试(US8)。
|
||
*
|
||
* 测试场景:
|
||
* 1. 同一 jobId 收到两次 SUCCESS 回调:annotation_task(EXTRACTION)仅创建一次
|
||
* 2. 超出最大重试次数 → job.status = FAILED,source_data.status = PENDING
|
||
*/
|
||
public class VideoCallbackIdempotencyTest extends AbstractIntegrationTest {
|
||
|
||
private static final String ADMIN_TOKEN = "test-admin-token-video";
|
||
|
||
@Autowired
|
||
private TestRestTemplate restTemplate;
|
||
|
||
@Autowired
|
||
private RedisService redisService;
|
||
|
||
private Long companyId;
|
||
private Long adminUserId;
|
||
private Long sourceId;
|
||
private Long jobId;
|
||
|
||
@BeforeEach
|
||
void setupTokenAndData() {
|
||
companyId = jdbcTemplate.queryForObject(
|
||
"SELECT id FROM sys_company WHERE company_code = 'DEMO'", Long.class);
|
||
adminUserId = jdbcTemplate.queryForObject(
|
||
"SELECT id FROM sys_user WHERE username = 'admin'", Long.class);
|
||
|
||
// 伪造 Redis Token
|
||
redisService.hSetAll(RedisUtil.tokenKey(ADMIN_TOKEN),
|
||
Map.of("userId", adminUserId.toString(), "role", "ADMIN",
|
||
"companyId", companyId.toString(), "username", "admin"),
|
||
3600L);
|
||
|
||
// 插入 source_data(PREPROCESSING 状态,模拟视频处理中)
|
||
jdbcTemplate.execute(
|
||
"INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " +
|
||
"file_name, file_size, bucket_name, status) " +
|
||
"VALUES (" + companyId + ", " + adminUserId + ", 'VIDEO', " +
|
||
"'videos/test.mp4', 'test.mp4', 10240, 'label-source-data', 'PREPROCESSING')");
|
||
sourceId = jdbcTemplate.queryForObject(
|
||
"SELECT id FROM source_data ORDER BY id DESC LIMIT 1", Long.class);
|
||
|
||
// 插入 PENDING 视频处理任务
|
||
jdbcTemplate.execute(
|
||
"INSERT INTO video_process_job (company_id, source_id, job_type, status, " +
|
||
"params, retry_count, max_retries) " +
|
||
"VALUES (" + companyId + ", " + sourceId + ", 'FRAME_EXTRACT', 'PENDING', " +
|
||
"'{}'::jsonb, 0, 3)");
|
||
jobId = jdbcTemplate.queryForObject(
|
||
"SELECT id FROM video_process_job ORDER BY id DESC LIMIT 1", Long.class);
|
||
}
|
||
|
||
@AfterEach
|
||
void cleanupTokens() {
|
||
redisService.delete(RedisUtil.tokenKey(ADMIN_TOKEN));
|
||
}
|
||
|
||
// ------------------------------------------------------------------ 测试 1: 幂等性 --
|
||
|
||
@Test
|
||
@DisplayName("同一 jobId 发送两次 SUCCESS 回调:source_data 仅更新一次,status=PENDING")
|
||
void successCallback_idempotent_sourceUpdatedOnce() {
|
||
// 第一次 SUCCESS 回调
|
||
ResponseEntity<Map> resp1 = sendCallback(jobId, "SUCCESS", "processed/frames.zip", null);
|
||
assertThat(resp1.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||
|
||
// 验证第一次回调后状态
|
||
String jobStatus1 = jdbcTemplate.queryForObject(
|
||
"SELECT status FROM video_process_job WHERE id = ?", String.class, jobId);
|
||
assertThat(jobStatus1).isEqualTo("SUCCESS");
|
||
|
||
String sourceStatus1 = jdbcTemplate.queryForObject(
|
||
"SELECT status FROM source_data WHERE id = ?", String.class, sourceId);
|
||
assertThat(sourceStatus1).isEqualTo("PENDING");
|
||
|
||
// 第二次 SUCCESS 回调(幂等:应直接返回,不重复处理)
|
||
ResponseEntity<Map> resp2 = sendCallback(jobId, "SUCCESS", "processed/frames.zip", null);
|
||
assertThat(resp2.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||
|
||
// 状态仍为 SUCCESS + PENDING,未被改变
|
||
String jobStatus2 = jdbcTemplate.queryForObject(
|
||
"SELECT status FROM video_process_job WHERE id = ?", String.class, jobId);
|
||
assertThat(jobStatus2).as("幂等:第二次回调不应改变 job 状态").isEqualTo("SUCCESS");
|
||
|
||
String sourceStatus2 = jdbcTemplate.queryForObject(
|
||
"SELECT status FROM source_data WHERE id = ?", String.class, sourceId);
|
||
assertThat(sourceStatus2).as("幂等:第二次回调不应改变 source_data 状态").isEqualTo("PENDING");
|
||
}
|
||
|
||
// ------------------------------------------------------------------ 测试 2: 超出重试上限 → FAILED --
|
||
|
||
@Test
|
||
@DisplayName("超出最大重试次数后 → job.status=FAILED,source_data.status=PENDING")
|
||
void failedCallback_exceedsMaxRetries_jobBecomesFailedAndSourceReverts() {
|
||
// 将 retry_count 设为 max_retries-1(再失败一次就超限)
|
||
jdbcTemplate.execute(
|
||
"UPDATE video_process_job SET retry_count = 2, max_retries = 3, " +
|
||
"status = 'RETRYING' WHERE id = " + jobId);
|
||
|
||
// 发送最后一次 FAILED 回调(retry_count 变为 3 = max_retries → 超限)
|
||
ResponseEntity<Map> resp = sendCallback(jobId, "FAILED", null, "ffmpeg 处理超时");
|
||
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||
|
||
// 验证 job → FAILED
|
||
Map<String, Object> jobRow = jdbcTemplate.queryForMap(
|
||
"SELECT status, retry_count, error_message FROM video_process_job WHERE id = ?", jobId);
|
||
assertThat(jobRow.get("status")).as("超出重试上限后 job 应为 FAILED").isEqualTo("FAILED");
|
||
assertThat(((Number) jobRow.get("retry_count")).intValue()).isEqualTo(3);
|
||
assertThat(jobRow.get("error_message")).isEqualTo("ffmpeg 处理超时");
|
||
|
||
// 验证 source_data → PENDING(管理员可重新处理)
|
||
String sourceStatus = jdbcTemplate.queryForObject(
|
||
"SELECT status FROM source_data WHERE id = ?", String.class, sourceId);
|
||
assertThat(sourceStatus).as("超出重试上限后 source_data 应回退为 PENDING").isEqualTo("PENDING");
|
||
}
|
||
|
||
// ------------------------------------------------------------------ 测试 3: 管理员重置 --
|
||
|
||
@Test
|
||
@DisplayName("管理员重置 FAILED 任务 → job.status=PENDING,retryCount=0")
|
||
void resetFailedJob_succeeds() {
|
||
// 先将任务置为 FAILED 状态
|
||
jdbcTemplate.execute(
|
||
"UPDATE video_process_job SET status = 'FAILED', retry_count = 3 WHERE id = " + jobId);
|
||
|
||
// 重置
|
||
HttpHeaders headers = new HttpHeaders();
|
||
headers.set("Authorization", "Bearer " + ADMIN_TOKEN);
|
||
ResponseEntity<Map> resp = restTemplate.exchange(
|
||
baseUrl("/label/api/video/jobs/" + jobId + "/reset"),
|
||
HttpMethod.POST,
|
||
new HttpEntity<>(headers),
|
||
Map.class);
|
||
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
|
||
|
||
// 验证
|
||
Map<String, Object> jobRow = jdbcTemplate.queryForMap(
|
||
"SELECT status, retry_count FROM video_process_job WHERE id = ?", jobId);
|
||
assertThat(jobRow.get("status")).isEqualTo("PENDING");
|
||
assertThat(((Number) jobRow.get("retry_count")).intValue()).isEqualTo(0);
|
||
}
|
||
|
||
// ------------------------------------------------------------------ 工具方法 --
|
||
|
||
private ResponseEntity<Map> sendCallback(Long jobId, String status,
|
||
String outputPath, String errorMessage) {
|
||
Map<String, Object> body;
|
||
if ("SUCCESS".equals(status)) {
|
||
body = Map.of("jobId", jobId, "status", status, "outputPath", outputPath);
|
||
} else {
|
||
body = Map.of("jobId", jobId, "status", status, "errorMessage",
|
||
errorMessage != null ? errorMessage : "");
|
||
}
|
||
|
||
HttpHeaders headers = new HttpHeaders();
|
||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||
return restTemplate.exchange(
|
||
baseUrl("/label/api/video/callback"),
|
||
HttpMethod.POST,
|
||
new HttpEntity<>(body, headers),
|
||
Map.class);
|
||
}
|
||
}
|