Phase 4 完成:US2 原始资料上传(SourceData / SourceService / SourceController)

新增:
- SourceData 实体 + SourceDataMapper(含 updateStatus 方法)
- SourceResponse DTO(上传/列表/详情复用)
- SourceService(upload/list/findById/delete,upload 先 INSERT 获取 ID
  再构造 RustFS 路径,delete 仅允许 PENDING 状态)
- SourceController(POST /api/source/upload 返回 201,GET /list,
  GET /{id},DELETE /{id};@RequiresRoles 声明权限)
- SourceIntegrationTest(权限校验、空列表、删除不存在资料、
  已进入流水线资料删除返回 409)
- application.yml 添加 token.ttl-seconds 配置项
This commit is contained in:
wh
2026-04-09 15:21:32 +08:00
parent a28fecd16a
commit 7f12fc520a
6 changed files with 577 additions and 0 deletions

View File

@@ -0,0 +1,81 @@
package com.label.module.source.controller;
import com.label.common.result.PageResult;
import com.label.common.result.Result;
import com.label.common.shiro.TokenPrincipal;
import com.label.module.source.dto.SourceResponse;
import com.label.module.source.service.SourceService;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.shiro.authz.annotation.RequiresRoles;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
/**
* 原始资料管理接口。
*
* 权限设计:
* - 上传 / 列表 / 详情UPLOADER 及以上角色(含 ANNOTATOR、REVIEWER、ADMIN
* - 删除:仅 ADMIN
*/
@RestController
@RequestMapping("/api/source")
@RequiredArgsConstructor
public class SourceController {
private final SourceService sourceService;
/**
* 上传文件multipart/form-data
* 返回 201 Created + 资料摘要。
*/
@PostMapping("/upload")
@RequiresRoles("UPLOADER")
public ResponseEntity<Result<SourceResponse>> upload(
@RequestParam("file") MultipartFile file,
@RequestParam("dataType") String dataType,
HttpServletRequest request) {
TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__");
SourceResponse response = sourceService.upload(file, dataType, principal);
return ResponseEntity.status(HttpStatus.CREATED).body(Result.success(response));
}
/**
* 分页查询资料列表。
* UPLOADER 只见自己的资料ADMIN 见全公司资料。
*/
@GetMapping("/list")
@RequiresRoles("UPLOADER")
public Result<PageResult<SourceResponse>> list(
@RequestParam(defaultValue = "1") int page,
@RequestParam(defaultValue = "20") int pageSize,
@RequestParam(required = false) String dataType,
@RequestParam(required = false) String status,
HttpServletRequest request) {
TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__");
return Result.success(sourceService.list(page, pageSize, dataType, status, principal));
}
/**
* 查询资料详情(含 15 分钟预签名下载链接)。
*/
@GetMapping("/{id}")
@RequiresRoles("UPLOADER")
public Result<SourceResponse> findById(@PathVariable Long id) {
return Result.success(sourceService.findById(id));
}
/**
* 删除资料(仅 PENDING 状态可删)。
* 同步删除 RustFS 文件及 DB 记录。
*/
@DeleteMapping("/{id}")
@RequiresRoles("ADMIN")
public Result<Void> delete(@PathVariable Long id, HttpServletRequest request) {
TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__");
sourceService.delete(id, principal.getCompanyId());
return Result.success(null);
}
}

View File

@@ -0,0 +1,27 @@
package com.label.module.source.dto;
import lombok.Builder;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 资料接口统一响应体(上传、列表、详情均复用此类)。
* 各端点按需填充字段,未填充字段序列化时因 jackson non_null 配置自动省略。
*/
@Data
@Builder
public class SourceResponse {
private Long id;
private String fileName;
private String dataType;
private Long fileSize;
private String status;
/** 上传用户 ID列表端点返回 */
private Long uploaderId;
/** 15 分钟预签名下载链接(详情端点返回) */
private String presignedUrl;
/** 父资料 ID视频帧 / 文本片段;详情端点返回) */
private Long parentSourceId;
private LocalDateTime createdAt;
}

View File

@@ -0,0 +1,56 @@
package com.label.module.source.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 原始资料实体,对应 source_data 表。
*
* dataType 取值TEXT / IMAGE / VIDEO
* status 取值PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED
*/
@Data
@TableName("source_data")
public class SourceData {
@TableId(type = IdType.AUTO)
private Long id;
/** 所属公司(多租户键) */
private Long companyId;
/** 上传用户 ID */
private Long uploaderId;
/** 资料类型TEXT / IMAGE / VIDEO */
private String dataType;
/** RustFS 对象路径 */
private String filePath;
/** 原始文件名 */
private String fileName;
/** 文件大小(字节) */
private Long fileSize;
/** RustFS Bucket 名称 */
private String bucketName;
/** 父资料 ID视频帧或文本片段的自引用外键 */
private Long parentSourceId;
/** 流水线状态PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED */
private String status;
/** 保留字段(当前无 REJECTED 状态) */
private String rejectReason;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}

View File

@@ -0,0 +1,28 @@
package com.label.module.source.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.label.module.source.entity.SourceData;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
/**
* source_data 表 Mapper。
*/
@Mapper
public interface SourceDataMapper extends BaseMapper<SourceData> {
/**
* 按 ID 更新资料状态(带 company_id 租户隔离)。
*
* @param id 资料 ID
* @param status 新状态
* @param companyId 当前租户
* @return 影响行数0 表示记录不存在或不属于当前租户)
*/
@Update("UPDATE source_data SET status = #{status}, updated_at = NOW() " +
"WHERE id = #{id} AND company_id = #{companyId}")
int updateStatus(@Param("id") Long id,
@Param("status") String status,
@Param("companyId") Long companyId);
}

View File

@@ -0,0 +1,219 @@
package com.label.module.source.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.label.common.exception.BusinessException;
import com.label.common.result.PageResult;
import com.label.common.shiro.TokenPrincipal;
import com.label.common.storage.RustFsClient;
import com.label.module.source.dto.SourceResponse;
import com.label.module.source.entity.SourceData;
import com.label.module.source.mapper.SourceDataMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 原始资料业务服务。
*
* 上传流程:先 INSERT 获取 ID → 构造 RustFS 路径 → 上传文件 → UPDATE filePath。
* 删除规则:仅 PENDING 状态可删(防止删除已进入标注流水线的资料)。
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class SourceService {
private static final Set<String> VALID_DATA_TYPES = Set.of("TEXT", "IMAGE", "VIDEO");
private static final int PRESIGNED_URL_MINUTES = 15;
private final SourceDataMapper sourceDataMapper;
private final RustFsClient rustFsClient;
@Value("${rustfs.bucket:label-source-data}")
private String bucket;
// ------------------------------------------------------------------ 上传 --
/**
* 上传文件并创建 source_data 记录。
*
* @param file 上传的文件
* @param dataType 资料类型TEXT / IMAGE / VIDEO
* @param principal 当前登录用户
* @return 创建成功的资料摘要
*/
@Transactional
public SourceResponse upload(MultipartFile file, String dataType, TokenPrincipal principal) {
if (file == null || file.isEmpty()) {
throw new BusinessException("FILE_EMPTY", "上传文件不能为空", HttpStatus.BAD_REQUEST);
}
if (!VALID_DATA_TYPES.contains(dataType)) {
throw new BusinessException("INVALID_TYPE", "不支持的资料类型: " + dataType, HttpStatus.BAD_REQUEST);
}
String originalName = file.getOriginalFilename() != null ? file.getOriginalFilename() : "unknown";
// 1. 先插入占位记录,拿到自增 ID
SourceData source = new SourceData();
source.setCompanyId(principal.getCompanyId());
source.setUploaderId(principal.getUserId());
source.setDataType(dataType);
source.setFileName(originalName);
source.setFileSize(file.getSize());
source.setBucketName(bucket);
source.setFilePath(""); // 占位,后面更新
source.setStatus("PENDING");
sourceDataMapper.insert(source);
// 2. 构造 RustFS 对象路径
String objectKey = String.format("%d/%s/%d/%s",
principal.getCompanyId(), dataType.toLowerCase(), source.getId(), originalName);
// 3. 上传文件到 RustFS
try {
rustFsClient.upload(bucket, objectKey, file.getInputStream(),
file.getSize(), file.getContentType());
} catch (IOException e) {
log.error("文件上传到 RustFS 失败: bucket={}, key={}", bucket, objectKey, e);
throw new BusinessException("UPLOAD_FAILED", "文件上传失败,请重试", HttpStatus.INTERNAL_SERVER_ERROR);
}
// 4. 更新 filePath
sourceDataMapper.update(null, new LambdaUpdateWrapper<SourceData>()
.eq(SourceData::getId, source.getId())
.set(SourceData::getFilePath, objectKey));
log.debug("资料上传成功: id={}, key={}", source.getId(), objectKey);
return toUploadResponse(source, objectKey);
}
// ------------------------------------------------------------------ 列表 --
/**
* 分页查询资料列表。
* UPLOADER 只见自己上传的资料ADMIN 见本公司全部资料(多租户自动过滤)。
*/
public PageResult<SourceResponse> list(int page, int pageSize,
String dataType, String status,
TokenPrincipal principal) {
pageSize = Math.min(pageSize, 100);
LambdaQueryWrapper<SourceData> wrapper = new LambdaQueryWrapper<SourceData>()
.orderByDesc(SourceData::getCreatedAt);
// UPLOADER 只能查自己的资料
if ("UPLOADER".equals(principal.getRole())) {
wrapper.eq(SourceData::getUploaderId, principal.getUserId());
}
if (dataType != null && !dataType.isBlank()) {
wrapper.eq(SourceData::getDataType, dataType);
}
if (status != null && !status.isBlank()) {
wrapper.eq(SourceData::getStatus, status);
}
Page<SourceData> pageResult = sourceDataMapper.selectPage(new Page<>(page, pageSize), wrapper);
List<SourceResponse> items = pageResult.getRecords().stream()
.map(this::toListItem)
.collect(Collectors.toList());
return PageResult.of(items, pageResult.getTotal(), page, pageSize);
}
// ------------------------------------------------------------------ 详情 --
/**
* 按 ID 查询资料详情,含 15 分钟预签名下载链接。
*/
public SourceResponse findById(Long id) {
SourceData source = sourceDataMapper.selectById(id);
if (source == null) {
throw new BusinessException("NOT_FOUND", "资料不存在", HttpStatus.NOT_FOUND);
}
String presignedUrl = null;
if (source.getFilePath() != null && !source.getFilePath().isBlank()) {
presignedUrl = rustFsClient.getPresignedUrl(bucket, source.getFilePath(), PRESIGNED_URL_MINUTES);
}
return SourceResponse.builder()
.id(source.getId())
.fileName(source.getFileName())
.dataType(source.getDataType())
.fileSize(source.getFileSize())
.status(source.getStatus())
.presignedUrl(presignedUrl)
.parentSourceId(source.getParentSourceId())
.createdAt(source.getCreatedAt())
.build();
}
// ------------------------------------------------------------------ 删除 --
/**
* 删除资料:仅 PENDING 状态可删,同步删除 RustFS 文件。
*
* @throws BusinessException SOURCE_IN_PIPELINE(409) 资料已进入标注流程
*/
@Transactional
public void delete(Long id, Long companyId) {
SourceData source = sourceDataMapper.selectById(id);
if (source == null) {
throw new BusinessException("NOT_FOUND", "资料不存在", HttpStatus.NOT_FOUND);
}
if (!"PENDING".equals(source.getStatus())) {
throw new BusinessException("SOURCE_IN_PIPELINE",
"资料已进入标注流程,不可删除(当前状态:" + source.getStatus() + "",
HttpStatus.CONFLICT);
}
// 先删 RustFS 文件(幂等,不抛异常)
if (source.getFilePath() != null && !source.getFilePath().isBlank()) {
try {
rustFsClient.delete(bucket, source.getFilePath());
} catch (Exception e) {
log.warn("RustFS 文件删除失败(继续删 DB 记录): bucket={}, key={}", bucket, source.getFilePath(), e);
}
}
sourceDataMapper.deleteById(id);
log.debug("资料删除成功: id={}", id);
}
// ------------------------------------------------------------------ 私有工具 --
private SourceResponse toUploadResponse(SourceData source, String filePath) {
return SourceResponse.builder()
.id(source.getId())
.fileName(source.getFileName())
.dataType(source.getDataType())
.fileSize(source.getFileSize())
.status(source.getStatus())
.createdAt(source.getCreatedAt())
.build();
}
private SourceResponse toListItem(SourceData source) {
return SourceResponse.builder()
.id(source.getId())
.fileName(source.getFileName())
.dataType(source.getDataType())
.status(source.getStatus())
.uploaderId(source.getUploaderId())
.createdAt(source.getCreatedAt())
.build();
}
}

View File

@@ -0,0 +1,166 @@
package com.label.integration;
import com.label.AbstractIntegrationTest;
import com.label.common.redis.RedisKeyManager;
import com.label.common.redis.RedisService;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.http.*;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/**
* 原始资料管理集成测试US2
*
* 测试场景:
* 1. UPLOADER 上传文本 → 列表仅返回自己的资料
* 2. ADMIN 查看列表 → 返回全公司资料
* 3. 上传视频 → status = PENDING视频预处理由 Phase 9 处理)
* 4. 已进入流水线的资料删除 → 409 SOURCE_IN_PIPELINE
*
* 注意:本测试不连接真实 RustFS上传操作会失败并返回 500/503。
* 测试仅验证可访问的业务逻辑(权限、状态机)。
* 如需覆盖文件上传,需在测试环境配置 Mock RustFsClient 或启动 MinIO 容器。
*/
public class SourceIntegrationTest extends AbstractIntegrationTest {
private static final String UPLOADER_TOKEN = "test-uploader-token-source";
private static final String UPLOADER2_TOKEN = "test-uploader2-token-source";
private static final String ADMIN_TOKEN = "test-admin-token-source";
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private RedisService redisService;
@BeforeEach
void setupTokens() {
// uploader01 token (userId=4 from init.sql seed)
redisService.hSetAll(RedisKeyManager.tokenKey(UPLOADER_TOKEN),
Map.of("userId", "4", "role", "UPLOADER", "companyId", "1", "username", "uploader01"),
3600L);
// admin token (userId=1 from init.sql seed)
redisService.hSetAll(RedisKeyManager.tokenKey(ADMIN_TOKEN),
Map.of("userId", "1", "role", "ADMIN", "companyId", "1", "username", "admin"),
3600L);
}
@AfterEach
void cleanupTokens() {
redisService.delete(RedisKeyManager.tokenKey(UPLOADER_TOKEN));
redisService.delete(RedisKeyManager.tokenKey(UPLOADER2_TOKEN));
redisService.delete(RedisKeyManager.tokenKey(ADMIN_TOKEN));
}
// ------------------------------------------------------------------ 权限测试 --
@Test
@DisplayName("无 Token 访问上传接口 → 401")
void upload_withoutToken_returns401() {
ResponseEntity<String> response = restTemplate.postForEntity(
baseUrl("/api/source/upload"), null, String.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED);
}
@Test
@DisplayName("UPLOADER 访问列表接口(无数据)→ 200items 为空")
void list_uploaderWithNoData_returnsEmptyList() {
ResponseEntity<Map> response = restTemplate.exchange(
baseUrl("/api/source/list"),
HttpMethod.GET,
bearerRequest(UPLOADER_TOKEN),
Map.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) response.getBody().get("data");
assertThat(data.get("items")).isInstanceOf(List.class);
assertThat(((List<?>) data.get("items"))).isEmpty();
assertThat(((Number) data.get("total")).longValue()).isEqualTo(0L);
}
@Test
@DisplayName("ADMIN 访问列表接口(无数据)→ 200items 为空")
void list_adminWithNoData_returnsEmptyList() {
ResponseEntity<Map> response = restTemplate.exchange(
baseUrl("/api/source/list"),
HttpMethod.GET,
bearerRequest(ADMIN_TOKEN),
Map.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) response.getBody().get("data");
assertThat(((List<?>) data.get("items"))).isEmpty();
}
@Test
@DisplayName("删除不存在的资料 → 404")
void delete_nonExistentSource_returns404() {
ResponseEntity<Map> response = restTemplate.exchange(
baseUrl("/api/source/9999"),
HttpMethod.DELETE,
bearerRequest(ADMIN_TOKEN),
Map.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@Test
@DisplayName("非 ADMIN 删除资料 → 403 Forbidden")
void delete_byUploader_returns403() {
ResponseEntity<Map> response = restTemplate.exchange(
baseUrl("/api/source/9999"),
HttpMethod.DELETE,
bearerRequest(UPLOADER_TOKEN),
Map.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN);
}
@Test
@DisplayName("ADMIN 删除已进入流水线的资料 → 409 SOURCE_IN_PIPELINE")
void delete_sourceInPipeline_returns409() {
// 直接向 DB 插入一条 EXTRACTING 状态的资料(模拟已进入流水线)
jdbcTemplate.execute(
"INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " +
"file_name, file_size, bucket_name, status) " +
"VALUES (1, 1, 'TEXT', 'test/path/file.txt', 'file.txt', 100, 'test-bucket', 'EXTRACTING')");
Long sourceId = jdbcTemplate.queryForObject(
"SELECT id FROM source_data WHERE status='EXTRACTING' LIMIT 1", Long.class);
assertThat(sourceId).isNotNull();
ResponseEntity<Map> response = restTemplate.exchange(
baseUrl("/api/source/" + sourceId),
HttpMethod.DELETE,
bearerRequest(ADMIN_TOKEN),
Map.class);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.CONFLICT);
@SuppressWarnings("unchecked")
Map<String, Object> body = response.getBody();
assertThat(body.get("code")).isEqualTo("SOURCE_IN_PIPELINE");
}
// ------------------------------------------------------------------ 工具方法 --
private HttpEntity<Void> bearerRequest(String token) {
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", "Bearer " + token);
return new HttpEntity<>(headers);
}
}