fix+refactor: 代码审查修复(11 项安全/并发缺陷)+ log.debug → log.info(21 处)

代码审查修复:
- MybatisPlusConfig: video_process_job 加入 IGNORED_TABLES(修复回调路径多租户过滤导致全部回调静默丢失)
- TokenFilter: catch(Exception) 替代 catch(NumberFormatException),防止空指针泄漏为 500
- VideoController: createJob 空指针防护 + handleCallback 共享密钥校验(X-Callback-Secret)
- VideoProcessService: handleCallback 显式校验 companyId 非空;triggerAi 失败改为 error 级日志
- ExtractionService/QaService: validateAndGetTask 显式校验 companyId(纵深防御)
- TaskClaimService: reclaim 增加原子 WHERE status='REJECTED';claim 异常时释放 Redis 锁
- TaskService: reassign 校验 targetUserId 属于同一租户
- AuthService: user:sessions:{userId} Set 设置滑动 TTL,防止 Token 无限累积
- ExportService/SourceService: RustFS + DB 非原子操作增加失败回滚清理
- SourceService: getOriginalFilename 使用 Paths.get().getFileName() 防路径遍历

日志规范:
- 11 个 Service 类 21 处 log.debug 替换为 log.info
This commit is contained in:
wh
2026-04-09 19:42:20 +08:00
parent d231180bff
commit c2a254cba4
17 changed files with 120 additions and 58 deletions

View File

@@ -19,8 +19,9 @@ public class MybatisPlusConfig {
// Tables that do NOT need tenant isolation (either global or tenant root tables)
private static final List<String> IGNORED_TABLES = Arrays.asList(
"sys_company", // the tenant root table itself
"sys_config" // has company_id=NULL for global defaults; service handles this manually
"sys_company", // the tenant root table itself
"sys_config", // has company_id=NULL for global defaults; service handles this manually
"video_process_job" // accessed by unauthenticated callback endpoint; service validates companyId manually
);
@Bean

View File

@@ -81,7 +81,7 @@ public class TokenFilter extends OncePerRequestFilter {
request.setAttribute("__token_principal__", principal);
filterChain.doFilter(request, response);
} catch (NumberFormatException e) {
} catch (Exception e) {
log.error("解析 Token 数据失败: {}", e.getMessage());
writeUnauthorized(response, "令牌数据格式错误");
} finally {

View File

@@ -1,22 +1,28 @@
package com.label.common.storage;
import lombok.extern.slf4j.Slf4j;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest;
import jakarta.annotation.PostConstruct;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
@Slf4j
@Component
public class RustFsClient {

View File

@@ -55,7 +55,7 @@ public class ExtractionApprovedEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void onExtractionApproved(ExtractionApprovedEvent event) {
log.debug("处理提取审批通过事件: taskId={}, sourceId={}", event.getTaskId(), event.getSourceId());
log.info("处理提取审批通过事件: taskId={}, sourceId={}", event.getTaskId(), event.getSourceId());
// 设置多租户上下文(新事务中 ThreadLocal 已清除)
CompanyContext.set(event.getCompanyId());
@@ -114,7 +114,7 @@ public class ExtractionApprovedEventListener {
// 4. 更新 source_data 状态为 QA_REVIEW
sourceDataMapper.updateStatus(event.getSourceId(), "QA_REVIEW", event.getCompanyId());
log.debug("审批通过后续处理完成: taskId={}, 新 QA 任务已创建", event.getTaskId());
log.info("审批通过后续处理完成: taskId={}, 新 QA 任务已创建", event.getTaskId());
}
/**

View File

@@ -248,7 +248,7 @@ public class ExtractionService {
*/
private AnnotationTask validateAndGetTask(Long taskId, Long companyId) {
AnnotationTask task = taskMapper.selectById(taskId);
if (task == null) {
if (task == null || !companyId.equals(task.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "任务不存在: " + taskId, HttpStatus.NOT_FOUND);
}
return task;

View File

@@ -193,7 +193,7 @@ public class QaService {
"SUBMITTED", "APPROVED",
principal.getUserId(), principal.getRole(), null);
log.debug("QA 审批通过,整条流水线完成: taskId={}, sourceId={}", taskId, task.getSourceId());
log.info("QA 审批通过,整条流水线完成: taskId={}, sourceId={}", taskId, task.getSourceId());
}
// ------------------------------------------------------------------ 驳回 --
@@ -237,7 +237,7 @@ public class QaService {
private AnnotationTask validateAndGetTask(Long taskId, Long companyId) {
AnnotationTask task = taskMapper.selectById(taskId);
if (task == null) {
if (task == null || !companyId.equals(task.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "任务不存在: " + taskId, HttpStatus.NOT_FOUND);
}
return task;

View File

@@ -123,7 +123,7 @@ public class SysConfigService {
}
existing.setUpdatedAt(LocalDateTime.now());
configMapper.updateById(existing);
log.debug("公司配置已更新: companyId={}, key={}, value={}", companyId, configKey, value);
log.info("公司配置已更新: companyId={}, key={}, value={}", companyId, configKey, value);
return existing;
} else {
SysConfig cfg = new SysConfig();
@@ -132,7 +132,7 @@ public class SysConfigService {
cfg.setConfigValue(value);
cfg.setDescription(description);
configMapper.insert(cfg);
log.debug("公司配置已创建: companyId={}, key={}, value={}", companyId, configKey, value);
log.info("公司配置已创建: companyId={}, key={}, value={}", companyId, configKey, value);
return cfg;
}
}

View File

@@ -90,14 +90,24 @@ public class ExportService {
new ByteArrayInputStream(jsonlBytes), jsonlBytes.length,
"application/jsonl");
// 插入 export_batch 记录
// 插入 export_batch 记录(若 DB 写入失败,尝试清理 RustFS 孤儿文件)
ExportBatch batch = new ExportBatch();
batch.setCompanyId(principal.getCompanyId());
batch.setBatchUuid(batchUuid);
batch.setSampleCount(samples.size());
batch.setDatasetFilePath(filePath);
batch.setFinetuneStatus("NOT_STARTED");
exportBatchMapper.insert(batch);
try {
exportBatchMapper.insert(batch);
} catch (Exception e) {
// DB 插入失败:尝试删除已上传的 RustFS 文件,防止产生孤儿文件
try {
rustFsClient.delete(EXPORT_BUCKET, filePath);
} catch (Exception deleteEx) {
log.error("DB 写入失败后清理 RustFS 文件亦失败,孤儿文件: {}/{}", EXPORT_BUCKET, filePath, deleteEx);
}
throw e;
}
// 批量更新 training_dataset.export_batch_id + exported_at
datasetMapper.update(null, new LambdaUpdateWrapper<TrainingDataset>()
@@ -106,7 +116,7 @@ public class ExportService {
.set(TrainingDataset::getExportedAt, LocalDateTime.now())
.set(TrainingDataset::getUpdatedAt, LocalDateTime.now()));
log.debug("导出批次已创建: batchId={}, sampleCount={}, path={}",
log.info("导出批次已创建: batchId={}, sampleCount={}, path={}",
batch.getId(), samples.size(), filePath);
return batch;
}

View File

@@ -68,7 +68,7 @@ public class FinetuneService {
exportBatchMapper.updateFinetuneInfo(batchId,
response.getJobId(), "RUNNING", principal.getCompanyId());
log.debug("微调任务已提交: batchId={}, glmJobId={}", batchId, response.getJobId());
log.info("微调任务已提交: batchId={}, glmJobId={}", batchId, response.getJobId());
return Map.of(
"glmJobId", response.getJobId(),

View File

@@ -62,7 +62,9 @@ public class SourceService {
throw new BusinessException("INVALID_TYPE", "不支持的资料类型: " + dataType, HttpStatus.BAD_REQUEST);
}
String originalName = file.getOriginalFilename() != null ? file.getOriginalFilename() : "unknown";
// 提取纯文件名,防止路径遍历(如 ../../admin/secret.txt
String rawName = file.getOriginalFilename() != null ? file.getOriginalFilename() : "unknown";
String originalName = java.nio.file.Paths.get(rawName).getFileName().toString();
// 1. 先插入占位记录,拿到自增 ID
SourceData source = new SourceData();
@@ -89,12 +91,21 @@ public class SourceService {
throw new BusinessException("UPLOAD_FAILED", "文件上传失败,请重试", HttpStatus.INTERNAL_SERVER_ERROR);
}
// 4. 更新 filePath
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, source.getId())
.set(SourceData::getFilePath, objectKey));
// 4. 更新 filePath(若失败则清理 RustFS 孤儿文件)
try {
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, source.getId())
.set(SourceData::getFilePath, objectKey));
} catch (Exception e) {
try {
rustFsClient.delete(bucket, objectKey);
} catch (Exception deleteEx) {
log.error("DB 更新失败后清理 RustFS 文件亦失败,孤儿文件: {}/{}", bucket, objectKey, deleteEx);
}
throw e;
}
log.debug("资料上传成功: id={}, key={}", source.getId(), objectKey);
log.info("资料上传成功: id={}, key={}", source.getId(), objectKey);
return toUploadResponse(source, objectKey);
}
@@ -190,7 +201,7 @@ public class SourceService {
}
sourceDataMapper.deleteById(id);
log.debug("资料删除成功: id={}", id);
log.info("资料删除成功: id={}", id);
}
// ------------------------------------------------------------------ 私有工具 --

View File

@@ -57,20 +57,28 @@ public class TaskClaimService {
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取,请选择其他任务", HttpStatus.CONFLICT);
}
// 2. DB 原子更新WHERE status='UNCLAIMED' 兜底)
int affected = taskMapper.claimTask(taskId, principal.getUserId(), principal.getCompanyId());
if (affected == 0) {
// DB 更新失败说明任务状态已变,清除刚设置的锁
try {
// 2. DB 原子更新WHERE status='UNCLAIMED' 兜底)
int affected = taskMapper.claimTask(taskId, principal.getUserId(), principal.getCompanyId());
if (affected == 0) {
// DB 更新失败说明任务状态已变,清除刚设置的锁
redisService.delete(lockKey);
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取,请选择其他任务", HttpStatus.CONFLICT);
}
// 3. 写入状态历史
insertHistory(taskId, principal.getCompanyId(),
"UNCLAIMED", "IN_PROGRESS",
principal.getUserId(), principal.getRole(), null);
log.info("任务领取成功: taskId={}, userId={}", taskId, principal.getUserId());
} catch (BusinessException e) {
throw e; // 业务异常直接上抛,锁已在上方清除
} catch (Exception e) {
// DB 写入异常(含 insertHistory 失败):清除 Redis 锁,事务回滚
redisService.delete(lockKey);
throw new BusinessException("TASK_CLAIMED", "任务已被他人领取,请选择其他任务", HttpStatus.CONFLICT);
throw e;
}
// 3. 写入状态历史
insertHistory(taskId, principal.getCompanyId(),
"UNCLAIMED", "IN_PROGRESS",
principal.getUserId(), principal.getRole(), null);
log.debug("任务领取成功: taskId={}, userId={}", taskId, principal.getUserId());
}
// ------------------------------------------------------------------ 放弃 --
@@ -131,6 +139,7 @@ public class TaskClaimService {
taskMapper.update(null, new LambdaUpdateWrapper<AnnotationTask>()
.eq(AnnotationTask::getId, taskId)
.eq(AnnotationTask::getStatus, "REJECTED")
.set(AnnotationTask::getStatus, "IN_PROGRESS")
.set(AnnotationTask::getClaimedAt, java.time.LocalDateTime.now()));

View File

@@ -48,7 +48,7 @@ public class TaskService {
task.setStatus("UNCLAIMED");
task.setIsFinal(false);
taskMapper.insert(task);
log.debug("任务已创建: id={}, type={}, sourceId={}", task.getId(), taskType, sourceId);
log.info("任务已创建: id={}, type={}, sourceId={}", task.getId(), taskType, sourceId);
return task;
}
@@ -156,7 +156,7 @@ public class TaskService {
@Transactional
public void reassign(Long taskId, Long targetUserId, TokenPrincipal principal) {
AnnotationTask task = taskMapper.selectById(taskId);
if (task == null) {
if (task == null || !principal.getCompanyId().equals(task.getCompanyId())) {
throw new BusinessException("NOT_FOUND", "任务不存在: " + taskId, HttpStatus.NOT_FOUND);
}

View File

@@ -86,9 +86,12 @@ public class AuthService {
redisService.hSetAll(RedisKeyManager.tokenKey(token), tokenData, tokenTtlSeconds);
// 将 token 加入该用户的活跃会话集合(用于角色变更时批量更新/失效)
redisService.sAdd(RedisKeyManager.userSessionsKey(user.getId()), token);
String sessionsKey = RedisKeyManager.userSessionsKey(user.getId());
redisService.sAdd(sessionsKey, token);
// 防止 Set 无限增长TTL = token 有效期(最后一次登录时滑动续期)
redisService.expire(sessionsKey, tokenTtlSeconds);
log.debug("用户登录成功: companyCode={}, username={}", request.getCompanyCode(), request.getUsername());
log.info("用户登录成功: companyCode={}, username={}", request.getCompanyCode(), request.getUsername());
return new LoginResponse(token, user.getId(), user.getUsername(), user.getRole(), tokenTtlSeconds);
}
@@ -107,7 +110,7 @@ public class AuthService {
redisService.sRemove(RedisKeyManager.userSessionsKey(Long.parseLong(userId)), token);
} catch (NumberFormatException ignored) {}
}
log.debug("用户退出Token 已删除: {}", token);
log.info("用户退出Token 已删除: {}", token);
}
}

View File

@@ -72,7 +72,7 @@ public class UserService {
user.setStatus("ACTIVE");
userMapper.insert(user);
log.debug("用户已创建: userId={}, username={}, role={}", user.getId(), username, role);
log.info("用户已创建: userId={}, username={}, role={}", user.getId(), username, role);
return user;
}
@@ -133,7 +133,7 @@ public class UserService {
// 3. 删除权限缓存(如 Shiro 缓存存在)
redisService.delete(RedisKeyManager.userPermKey(userId));
log.debug("用户角色已变更: userId={}, newRole={}, 更新 {} 个活跃 Token", userId, newRole, tokens.size());
log.info("用户角色已变更: userId={}, newRole={}, 更新 {} 个活跃 Token", userId, newRole, tokens.size());
}
// ------------------------------------------------------------------ 变更状态 --
@@ -163,7 +163,7 @@ public class UserService {
Set<String> tokens = redisService.sMembers(RedisKeyManager.userSessionsKey(userId));
tokens.forEach(token -> redisService.delete(RedisKeyManager.tokenKey(token)));
redisService.delete(RedisKeyManager.userSessionsKey(userId));
log.debug("账号已禁用,已删除 {} 个活跃 Token: userId={}", tokens.size(), userId);
log.info("账号已禁用,已删除 {} 个活跃 Token: userId={}", tokens.size(), userId);
}
// 删除权限缓存

View File

@@ -8,6 +8,7 @@ import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.authz.annotation.RequiresRoles;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@@ -27,13 +28,21 @@ public class VideoController {
private final VideoProcessService videoProcessService;
@Value("${video.callback-secret:}")
private String callbackSecret;
/** POST /api/video/process — 触发视频处理任务 */
@PostMapping("/api/video/process")
@RequiresRoles("ADMIN")
public Result<VideoProcessJob> createJob(@RequestBody Map<String, Object> body,
HttpServletRequest request) {
Long sourceId = Long.parseLong(body.get("sourceId").toString());
String jobType = (String) body.get("jobType");
Object sourceIdVal = body.get("sourceId");
Object jobTypeVal = body.get("jobType");
if (sourceIdVal == null || jobTypeVal == null) {
return Result.failure("INVALID_PARAMS", "sourceId 和 jobType 不能为空");
}
Long sourceId = Long.parseLong(sourceIdVal.toString());
String jobType = jobTypeVal.toString();
String params = body.containsKey("params") ? body.get("params").toString() : null;
TokenPrincipal principal = principal(request);
@@ -68,7 +77,16 @@ public class VideoController {
* { "jobId": 123, "status": "FAILED", "errorMessage": "ffmpeg error: ..." }
*/
@PostMapping("/api/video/callback")
public Result<Void> handleCallback(@RequestBody Map<String, Object> body) {
public Result<Void> handleCallback(@RequestBody Map<String, Object> body,
HttpServletRequest request) {
// 共享密钥校验(配置了 VIDEO_CALLBACK_SECRET 时强制校验)
if (callbackSecret != null && !callbackSecret.isBlank()) {
String provided = request.getHeader("X-Callback-Secret");
if (!callbackSecret.equals(provided)) {
return Result.failure("UNAUTHORIZED", "回调密钥无效");
}
}
Long jobId = Long.parseLong(body.get("jobId").toString());
String status = (String) body.get("status");
String outputPath = body.containsKey("outputPath") ? (String) body.get("outputPath") : null;

View File

@@ -103,7 +103,7 @@ public class VideoProcessService {
}
});
log.debug("视频处理任务已创建AI 将在事务提交后触发): jobId={}, sourceId={}", jobId, sourceId);
log.info("视频处理任务已创建AI 将在事务提交后触发): jobId={}, sourceId={}", jobId, sourceId);
return job;
}
@@ -123,15 +123,16 @@ public class VideoProcessService {
@Transactional
public void handleCallback(Long jobId, String callbackStatus,
String outputPath, String errorMessage) {
// video_process_job 在 IGNORED_TABLES 中(回调无 CompanyContext此处显式校验
VideoProcessJob job = jobMapper.selectById(jobId);
if (job == null) {
if (job == null || job.getCompanyId() == null) {
log.warn("视频处理回调job 不存在jobId={}", jobId);
return;
}
// 幂等:已成功则忽略重复回调
if ("SUCCESS".equals(job.getStatus())) {
log.debug("视频处理回调幂等jobId={} 已为 SUCCESS跳过", jobId);
log.info("视频处理回调幂等jobId={} 已为 SUCCESS跳过", jobId);
return;
}
@@ -175,7 +176,7 @@ public class VideoProcessService {
job.setStatus("PENDING");
job.setRetryCount(0);
log.debug("视频处理任务已重置: jobId={}", jobId);
log.info("视频处理任务已重置: jobId={}", jobId);
return job;
}
@@ -206,7 +207,7 @@ public class VideoProcessService {
.set(SourceData::getStatus, "PENDING")
.set(SourceData::getUpdatedAt, LocalDateTime.now()));
log.debug("视频处理成功jobId={}, sourceId={}", job.getId(), job.getSourceId());
log.info("视频处理成功jobId={}, sourceId={}", job.getId(), job.getSourceId());
}
private void handleFailure(VideoProcessJob job, String errorMessage) {
@@ -274,9 +275,9 @@ public class VideoProcessService {
} else {
aiServiceClient.videoToText(req);
}
log.debug("AI 触发成功: jobId={}", jobId);
log.info("AI 触发成功: jobId={}", jobId);
} catch (Exception e) {
log.warn("触发视频处理 AI 失败jobId={}{}job 保持当前状态等待重试", jobId, e.getMessage());
log.error("触发视频处理 AI 失败jobId={}{}job 保持当前状态,需管理员手动重置", jobId, e.getMessage());
}
}