package com.label.service; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.label.common.exception.BusinessException; import com.label.common.shiro.TokenPrincipal; import com.label.common.statemachine.StateValidator; import com.label.common.statemachine.TaskStatus; import com.label.entity.AnnotationTask; import com.label.entity.AnnotationTaskHistory; import com.label.mapper.AnnotationTaskMapper; import com.label.mapper.TaskHistoryMapper; import com.label.util.RedisUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * 任务领取/放弃/重领服务。 * * 并发安全设计: * 1. Redis SET NX 作为分布式预锁(TTL 30s),快速拒绝并发请求 * 2. DB UPDATE WHERE status='UNCLAIMED' 作为兜底原子操作 * 两层防护确保同一任务只有一人可领取 */ @Slf4j @Service @RequiredArgsConstructor public class TaskClaimService { /** Redis 分布式锁 TTL(秒) */ private static final long CLAIM_LOCK_TTL = 30L; private final AnnotationTaskMapper taskMapper; private final TaskHistoryMapper historyMapper; private final RedisService redisService; // ------------------------------------------------------------------ 领取 -- /** * 领取任务(双重防护:Redis NX + DB 原子更新)。 * * @param taskId 任务 ID * @param principal 当前用户 * @throws BusinessException TASK_CLAIMED(409) 任务已被他人领取 */ @Transactional public void claim(Long taskId, TokenPrincipal principal) { String lockKey = RedisUtil.taskClaimKey(taskId); // 1. Redis SET NX 预锁(快速失败) boolean lockAcquired = redisService.setIfAbsent( lockKey, principal.getUserId().toString(), CLAIM_LOCK_TTL); if (!lockAcquired) { throw new BusinessException("TASK_CLAIMED", "任务已被他人领取,请选择其他任务", HttpStatus.CONFLICT); } try { // 2. DB 原子更新(WHERE status='UNCLAIMED' 兜底) int affected = taskMapper.claimTask(taskId, principal.getUserId(), principal.getCompanyId()); if (affected == 0) { // DB 更新失败说明任务状态已变,清除刚设置的锁 redisService.delete(lockKey); throw new BusinessException("TASK_CLAIMED", "任务已被他人领取,请选择其他任务", HttpStatus.CONFLICT); } // 3. 写入状态历史 insertHistory(taskId, principal.getCompanyId(), "UNCLAIMED", "IN_PROGRESS", principal.getUserId(), principal.getRole(), null); log.info("任务领取成功: taskId={}, userId={}", taskId, principal.getUserId()); } catch (BusinessException e) { throw e; // 业务异常直接上抛,锁已在上方清除 } catch (Exception e) { // DB 写入异常(含 insertHistory 失败):清除 Redis 锁,事务回滚 redisService.delete(lockKey); throw e; } } // ------------------------------------------------------------------ 放弃 -- /** * 放弃任务(IN_PROGRESS → UNCLAIMED)。 * * @param taskId 任务 ID * @param principal 当前用户 */ @Transactional public void unclaim(Long taskId, TokenPrincipal principal) { AnnotationTask task = taskMapper.selectById(taskId); validateTaskExists(task, taskId); StateValidator.assertTransition(TaskStatus.TRANSITIONS, TaskStatus.valueOf(task.getStatus()), TaskStatus.UNCLAIMED); taskMapper.update(null, new LambdaUpdateWrapper() .eq(AnnotationTask::getId, taskId) .set(AnnotationTask::getStatus, "UNCLAIMED") .set(AnnotationTask::getClaimedBy, null) .set(AnnotationTask::getClaimedAt, null)); // 清除 Redis 分布式锁 redisService.delete(RedisUtil.taskClaimKey(taskId)); insertHistory(taskId, principal.getCompanyId(), "IN_PROGRESS", "UNCLAIMED", principal.getUserId(), principal.getRole(), null); } // ------------------------------------------------------------------ 重领 -- /** * 重领任务(REJECTED → IN_PROGRESS,仅原领取人可重领)。 * * @param taskId 任务 ID * @param principal 当前用户 */ @Transactional public void reclaim(Long taskId, TokenPrincipal principal) { AnnotationTask task = taskMapper.selectById(taskId); validateTaskExists(task, taskId); if (!"REJECTED".equals(task.getStatus())) { throw new BusinessException("INVALID_STATE_TRANSITION", "只有 REJECTED 状态的任务可以重领", HttpStatus.CONFLICT); } if (!principal.getUserId().equals(task.getClaimedBy())) { throw new BusinessException("FORBIDDEN", "只有原领取人可以重领该任务", HttpStatus.FORBIDDEN); } StateValidator.assertTransition(TaskStatus.TRANSITIONS, TaskStatus.valueOf(task.getStatus()), TaskStatus.IN_PROGRESS); taskMapper.update(null, new LambdaUpdateWrapper() .eq(AnnotationTask::getId, taskId) .eq(AnnotationTask::getStatus, "REJECTED") .set(AnnotationTask::getStatus, "IN_PROGRESS") .set(AnnotationTask::getClaimedAt, java.time.LocalDateTime.now())); // 重新设置 Redis 锁(防止并发再次争抢) redisService.setIfAbsent( RedisUtil.taskClaimKey(taskId), principal.getUserId().toString(), CLAIM_LOCK_TTL); insertHistory(taskId, principal.getCompanyId(), "REJECTED", "IN_PROGRESS", principal.getUserId(), principal.getRole(), null); } // ------------------------------------------------------------------ 私有工具 -- private void validateTaskExists(AnnotationTask task, Long taskId) { if (task == null) { throw new BusinessException("NOT_FOUND", "任务不存在: " + taskId, HttpStatus.NOT_FOUND); } } /** * 向 annotation_task_history 追加一条历史记录(仅 INSERT,禁止 UPDATE/DELETE)。 */ public void insertHistory(Long taskId, Long companyId, String fromStatus, String toStatus, Long operatorId, String operatorRole, String comment) { historyMapper.insert(AnnotationTaskHistory.builder() .taskId(taskId) .companyId(companyId) .fromStatus(fromStatus) .toStatus(toStatus) .operatorId(operatorId) .operatorRole(operatorRole) .comment(comment) .build()); } }