feat(phase5): US3+US4 任务领取、提取标注与审批模块

- 任务领取(TaskClaimService):Redis SET NX + DB WHERE status=UNCLAIMED 双重并发防护
- 任务管理(TaskService/TaskController):任务池/我的任务/待审批/全部任务/创建/指派 10 端点
- 提取标注(ExtractionService/ExtractionController):AI 预标注/更新/提交/审批/驳回 5 端点
- 审批解耦(ExtractionApprovedEventListener):@TransactionalEventListener(AFTER_COMMIT) + REQUIRES_NEW
  确保 AI QA 生成在审批事务提交后独立执行,异常不回滚审批结果
- 状态实体:AnnotationTask/AnnotationTaskHistory/AnnotationResult/TrainingDataset
- 集成测试:并发领取安全(10 线程恰好 1 成功)+ 审批流(通过/自审/驳回重领)
This commit is contained in:
wh
2026-04-09 15:36:11 +08:00
parent 7f12fc520a
commit 927e4f1cf3
18 changed files with 1679 additions and 0 deletions

View File

@@ -0,0 +1,217 @@
package com.label.integration;
import com.label.AbstractIntegrationTest;
import com.label.module.user.dto.LoginRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.*;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/**
* 提取阶段审批集成测试US4
*
* 测试场景:
* 1. 审批通过 → QA_GENERATION 任务自动创建source_data 状态更新为 QA_REVIEW
* 2. 审批人与提交人相同(自审)→ 403 SELF_REVIEW_FORBIDDEN
* 3. 驳回后标注员可重领任务并再次提交
*/
public class ExtractionApprovalIntegrationTest extends AbstractIntegrationTest {
@Autowired
private TestRestTemplate restTemplate;
private Long sourceId;
private Long taskId;
private Long annotatorUserId;
private Long reviewerUserId;
@BeforeEach
void setup() {
// 获取种子用户 IDinit.sql 中已插入)
annotatorUserId = jdbcTemplate.queryForObject(
"SELECT id FROM sys_user WHERE username = 'annotator01'", Long.class);
reviewerUserId = jdbcTemplate.queryForObject(
"SELECT id FROM sys_user WHERE username = 'reviewer01'", Long.class);
Long companyId = jdbcTemplate.queryForObject(
"SELECT id FROM sys_company WHERE company_code = 'DEMO'", Long.class);
// 插入测试 source_data
jdbcTemplate.execute(
"INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " +
"file_name, file_size, bucket_name, status) " +
"VALUES (" + companyId + ", " + annotatorUserId + ", 'TEXT', " +
"'test/approval-test/file.txt', 'file.txt', 100, 'test-bucket', 'PENDING')");
sourceId = jdbcTemplate.queryForObject(
"SELECT id FROM source_data ORDER BY id DESC LIMIT 1", Long.class);
// 插入 UNCLAIMED EXTRACTION 任务
jdbcTemplate.execute(
"INSERT INTO annotation_task (company_id, source_id, task_type, status) " +
"VALUES (" + companyId + ", " + sourceId + ", 'EXTRACTION', 'UNCLAIMED')");
taskId = jdbcTemplate.queryForObject(
"SELECT id FROM annotation_task ORDER BY id DESC LIMIT 1", Long.class);
}
// ------------------------------------------------------------------ 测试 1: 审批通过 → QA 任务自动创建 --
@Test
@DisplayName("审批通过后QA_GENERATION 任务自动创建source_data 状态变为 QA_REVIEW")
void approveTask_thenQaTaskAndSourceStatusUpdated() {
String annotatorToken = loginAndGetToken("DEMO", "annotator01", "annot123");
String reviewerToken = loginAndGetToken("DEMO", "reviewer01", "review123");
// 1. 标注员领取任务
ResponseEntity<Map> claimResp = restTemplate.exchange(
baseUrl("/api/tasks/" + taskId + "/claim"),
HttpMethod.POST, bearerRequest(annotatorToken), Map.class);
assertThat(claimResp.getStatusCode()).isEqualTo(HttpStatus.OK);
// 2. 标注员提交标注
ResponseEntity<Map> submitResp = restTemplate.exchange(
baseUrl("/api/extraction/" + taskId + "/submit"),
HttpMethod.POST, bearerRequest(annotatorToken), Map.class);
assertThat(submitResp.getStatusCode()).isEqualTo(HttpStatus.OK);
// 3. 审核员审批通过
// 注ExtractionApprovedEventListener(@TransactionalEventListener AFTER_COMMIT)
// 在同一线程中同步执行HTTP 响应返回前已完成后续处理
ResponseEntity<Map> approveResp = restTemplate.exchange(
baseUrl("/api/extraction/" + taskId + "/approve"),
HttpMethod.POST, bearerRequest(reviewerToken), Map.class);
assertThat(approveResp.getStatusCode()).isEqualTo(HttpStatus.OK);
// 验证:原任务状态变为 APPROVEDis_final=true
Map<String, Object> taskRow = jdbcTemplate.queryForMap(
"SELECT status, is_final FROM annotation_task WHERE id = ?", taskId);
assertThat(taskRow.get("status")).isEqualTo("APPROVED");
assertThat(taskRow.get("is_final")).isEqualTo(Boolean.TRUE);
// 验证QA_GENERATION 任务已自动创建UNCLAIMED 状态)
Integer qaTaskCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM annotation_task " +
"WHERE source_id = ? AND task_type = 'QA_GENERATION' AND status = 'UNCLAIMED'",
Integer.class, sourceId);
assertThat(qaTaskCount).as("QA_GENERATION 任务应已创建").isEqualTo(1);
// 验证source_data 状态已更新为 QA_REVIEW
String sourceStatus = jdbcTemplate.queryForObject(
"SELECT status FROM source_data WHERE id = ?", String.class, sourceId);
assertThat(sourceStatus).as("source_data 状态应为 QA_REVIEW").isEqualTo("QA_REVIEW");
// 验证training_dataset 已以 PENDING_REVIEW 状态创建
Integer datasetCount = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM training_dataset " +
"WHERE source_id = ? AND status = 'PENDING_REVIEW'",
Integer.class, sourceId);
assertThat(datasetCount).as("training_dataset 应已创建").isEqualTo(1);
}
// ------------------------------------------------------------------ 测试 2: 自审返回 403 --
@Test
@DisplayName("审批人与任务领取人相同(自审)→ 403 SELF_REVIEW_FORBIDDEN")
void approveOwnSubmission_returnsForbidden() {
// 直接将任务置为 SUBMITTED 并设 claimed_by = reviewer01模拟自审场景
jdbcTemplate.execute(
"UPDATE annotation_task " +
"SET status = 'SUBMITTED', claimed_by = " + reviewerUserId +
", claimed_at = NOW(), submitted_at = NOW() " +
"WHERE id = " + taskId);
String reviewerToken = loginAndGetToken("DEMO", "reviewer01", "review123");
ResponseEntity<Map> resp = restTemplate.exchange(
baseUrl("/api/extraction/" + taskId + "/approve"),
HttpMethod.POST, bearerRequest(reviewerToken), Map.class);
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN);
// 验证任务状态未变
String status = jdbcTemplate.queryForObject(
"SELECT status FROM annotation_task WHERE id = ?", String.class, taskId);
assertThat(status).isEqualTo("SUBMITTED");
}
// ------------------------------------------------------------------ 测试 3: 驳回 → 重领 → 再提交 --
@Test
@DisplayName("驳回后标注员可重领任务并再次提交,任务状态恢复为 SUBMITTED")
void rejectThenReclaimAndResubmit_succeeds() {
String annotatorToken = loginAndGetToken("DEMO", "annotator01", "annot123");
String reviewerToken = loginAndGetToken("DEMO", "reviewer01", "review123");
// 1. 标注员领取并提交
restTemplate.exchange(baseUrl("/api/tasks/" + taskId + "/claim"),
HttpMethod.POST, bearerRequest(annotatorToken), Map.class);
restTemplate.exchange(baseUrl("/api/extraction/" + taskId + "/submit"),
HttpMethod.POST, bearerRequest(annotatorToken), Map.class);
// 2. 审核员驳回(驳回原因必填)
HttpHeaders rejectHeaders = new HttpHeaders();
rejectHeaders.set("Authorization", "Bearer " + reviewerToken);
rejectHeaders.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Map<String, String>> rejectReq = new HttpEntity<>(
Map.of("reason", "实体识别有误,请重新标注"), rejectHeaders);
ResponseEntity<Map> rejectResp = restTemplate.exchange(
baseUrl("/api/extraction/" + taskId + "/reject"),
HttpMethod.POST, rejectReq, Map.class);
assertThat(rejectResp.getStatusCode()).isEqualTo(HttpStatus.OK);
// 验证:任务状态变为 REJECTED
String statusAfterReject = jdbcTemplate.queryForObject(
"SELECT status FROM annotation_task WHERE id = ?", String.class, taskId);
assertThat(statusAfterReject).isEqualTo("REJECTED");
// 3. 标注员重领任务REJECTED → IN_PROGRESS
ResponseEntity<Map> reclaimResp = restTemplate.exchange(
baseUrl("/api/tasks/" + taskId + "/reclaim"),
HttpMethod.POST, bearerRequest(annotatorToken), Map.class);
assertThat(reclaimResp.getStatusCode()).isEqualTo(HttpStatus.OK);
// 验证:任务状态恢复为 IN_PROGRESS
String statusAfterReclaim = jdbcTemplate.queryForObject(
"SELECT status FROM annotation_task WHERE id = ?", String.class, taskId);
assertThat(statusAfterReclaim).isEqualTo("IN_PROGRESS");
// 4. 标注员再次提交IN_PROGRESS → SUBMITTED
ResponseEntity<Map> resubmitResp = restTemplate.exchange(
baseUrl("/api/extraction/" + taskId + "/submit"),
HttpMethod.POST, bearerRequest(annotatorToken), Map.class);
assertThat(resubmitResp.getStatusCode()).isEqualTo(HttpStatus.OK);
// 验证:任务状态变为 SUBMITTED
String finalStatus = jdbcTemplate.queryForObject(
"SELECT status FROM annotation_task WHERE id = ?", String.class, taskId);
assertThat(finalStatus).isEqualTo("SUBMITTED");
}
// ------------------------------------------------------------------ 工具方法 --
private String loginAndGetToken(String companyCode, String username, String password) {
LoginRequest req = new LoginRequest();
req.setCompanyCode(companyCode);
req.setUsername(username);
req.setPassword(password);
ResponseEntity<Map> response = restTemplate.postForEntity(
baseUrl("/api/auth/login"), req, Map.class);
if (!response.getStatusCode().is2xxSuccessful()) {
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) response.getBody().get("data");
return (String) data.get("token");
}
private HttpEntity<Void> bearerRequest(String token) {
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", "Bearer " + token);
return new HttpEntity<>(headers);
}
}

View File

@@ -0,0 +1,135 @@
package com.label.integration;
import com.label.AbstractIntegrationTest;
import com.label.common.redis.RedisKeyManager;
import com.label.common.redis.RedisService;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
/**
* 任务领取并发安全集成测试US3
*
* 测试场景10 个线程同时争抢同一 UNCLAIMED 任务。
* 期望结果:
* - 恰好 1 人成功200 OK
* - 其余 9 人收到 TASK_CLAIMED (409)
* - DB 中 claimed_by 唯一(只有一个用户 ID
*
* 此测试需要 10 个不同的 userId使用同一 DB 用户账号但不同的 Token。
*/
public class TaskClaimConcurrencyTest extends AbstractIntegrationTest {
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private RedisService redisService;
private Long taskId;
private final List<String> tokens = new ArrayList<>();
@BeforeEach
void setup() {
// 创建测试任务(直接向 DB 插入一条 UNCLAIMED 任务)
jdbcTemplate.execute(
"INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " +
"file_name, file_size, bucket_name, status) " +
"VALUES (1, 1, 'TEXT', 'test/path/file.txt', 'file.txt', 100, 'test-bucket', 'PENDING')");
Long sourceId = jdbcTemplate.queryForObject(
"SELECT id FROM source_data ORDER BY id DESC LIMIT 1", Long.class);
jdbcTemplate.execute(
"INSERT INTO annotation_task (company_id, source_id, task_type, status) " +
"VALUES (1, " + sourceId + ", 'EXTRACTION', 'UNCLAIMED')");
taskId = jdbcTemplate.queryForObject(
"SELECT id FROM annotation_task ORDER BY id DESC LIMIT 1", Long.class);
// 创建 10 个 Annotator Token模拟不同用户
for (int i = 1; i <= 10; i++) {
String token = "concurrency-test-token-" + i;
tokens.add(token);
// 所有 Token 使用 userId=3annotator01这在真实场景不会发生
// 但在测试中用于验证并发锁机制redis key 基于 taskId不是 userId
redisService.hSetAll(RedisKeyManager.tokenKey(token),
Map.of("userId", String.valueOf(i + 100), // 假设 userId > 100 不存在,但不影响锁逻辑
"role", "ANNOTATOR", "companyId", "1", "username", "annotator" + i),
3600L);
}
}
@AfterEach
void cleanup() {
tokens.forEach(token -> redisService.delete(RedisKeyManager.tokenKey(token)));
if (taskId != null) {
redisService.delete(RedisKeyManager.taskClaimKey(taskId));
}
}
@Test
@DisplayName("10 线程并发抢同一任务:恰好 1 人成功,其余 9 人收到 409 TASK_CLAIMED")
void concurrentClaim_onlyOneSucceeds() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(10);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger conflictCount = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
final String token = tokens.get(i);
executor.submit(() -> {
try {
startLatch.await(); // 等待起跑信号,最大化并发
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", "Bearer " + token);
HttpEntity<Void> request = new HttpEntity<>(headers);
ResponseEntity<Map> response = restTemplate.exchange(
baseUrl("/api/tasks/" + taskId + "/claim"),
HttpMethod.POST, request, Map.class);
if (response.getStatusCode() == HttpStatus.OK) {
successCount.incrementAndGet();
} else if (response.getStatusCode() == HttpStatus.CONFLICT) {
conflictCount.incrementAndGet();
}
} catch (Exception e) {
conflictCount.incrementAndGet(); // 异常也算失败
} finally {
doneLatch.countDown();
}
});
}
startLatch.countDown(); // 同时放行所有线程
doneLatch.await(30, TimeUnit.SECONDS);
executor.shutdown();
// 恰好 1 人成功
assertThat(successCount.get()).isEqualTo(1);
// 其余 9 人失败409 或异常)
assertThat(conflictCount.get()).isEqualTo(9);
// DB 中 claimed_by 有且仅有一个值
String claimedByStr = jdbcTemplate.queryForObject(
"SELECT claimed_by::text FROM annotation_task WHERE id = ?",
String.class, taskId);
assertThat(claimedByStr).isNotNull();
// DB 中状态为 IN_PROGRESS
String status = jdbcTemplate.queryForObject(
"SELECT status FROM annotation_task WHERE id = ?", String.class, taskId);
assertThat(status).isEqualTo("IN_PROGRESS");
}
}