diff --git a/src/main/java/com/label/module/source/controller/SourceController.java b/src/main/java/com/label/module/source/controller/SourceController.java new file mode 100644 index 0000000..d971454 --- /dev/null +++ b/src/main/java/com/label/module/source/controller/SourceController.java @@ -0,0 +1,81 @@ +package com.label.module.source.controller; + +import com.label.common.result.PageResult; +import com.label.common.result.Result; +import com.label.common.shiro.TokenPrincipal; +import com.label.module.source.dto.SourceResponse; +import com.label.module.source.service.SourceService; +import jakarta.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import org.apache.shiro.authz.annotation.RequiresRoles; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.multipart.MultipartFile; + +/** + * 原始资料管理接口。 + * + * 权限设计: + * - 上传 / 列表 / 详情:UPLOADER 及以上角色(含 ANNOTATOR、REVIEWER、ADMIN) + * - 删除:仅 ADMIN + */ +@RestController +@RequestMapping("/api/source") +@RequiredArgsConstructor +public class SourceController { + + private final SourceService sourceService; + + /** + * 上传文件(multipart/form-data)。 + * 返回 201 Created + 资料摘要。 + */ + @PostMapping("/upload") + @RequiresRoles("UPLOADER") + public ResponseEntity> upload( + @RequestParam("file") MultipartFile file, + @RequestParam("dataType") String dataType, + HttpServletRequest request) { + TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__"); + SourceResponse response = sourceService.upload(file, dataType, principal); + return ResponseEntity.status(HttpStatus.CREATED).body(Result.success(response)); + } + + /** + * 分页查询资料列表。 + * UPLOADER 只见自己的资料;ADMIN 见全公司资料。 + */ + @GetMapping("/list") + @RequiresRoles("UPLOADER") + public Result> list( + @RequestParam(defaultValue = "1") int page, + @RequestParam(defaultValue = "20") int pageSize, + @RequestParam(required = false) String dataType, + @RequestParam(required = false) String status, + HttpServletRequest request) { + TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__"); + return Result.success(sourceService.list(page, pageSize, dataType, status, principal)); + } + + /** + * 查询资料详情(含 15 分钟预签名下载链接)。 + */ + @GetMapping("/{id}") + @RequiresRoles("UPLOADER") + public Result findById(@PathVariable Long id) { + return Result.success(sourceService.findById(id)); + } + + /** + * 删除资料(仅 PENDING 状态可删)。 + * 同步删除 RustFS 文件及 DB 记录。 + */ + @DeleteMapping("/{id}") + @RequiresRoles("ADMIN") + public Result delete(@PathVariable Long id, HttpServletRequest request) { + TokenPrincipal principal = (TokenPrincipal) request.getAttribute("__token_principal__"); + sourceService.delete(id, principal.getCompanyId()); + return Result.success(null); + } +} diff --git a/src/main/java/com/label/module/source/dto/SourceResponse.java b/src/main/java/com/label/module/source/dto/SourceResponse.java new file mode 100644 index 0000000..418afda --- /dev/null +++ b/src/main/java/com/label/module/source/dto/SourceResponse.java @@ -0,0 +1,27 @@ +package com.label.module.source.dto; + +import lombok.Builder; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * 资料接口统一响应体(上传、列表、详情均复用此类)。 + * 各端点按需填充字段,未填充字段序列化时因 jackson non_null 配置自动省略。 + */ +@Data +@Builder +public class SourceResponse { + private Long id; + private String fileName; + private String dataType; + private Long fileSize; + private String status; + /** 上传用户 ID(列表端点返回) */ + private Long uploaderId; + /** 15 分钟预签名下载链接(详情端点返回) */ + private String presignedUrl; + /** 父资料 ID(视频帧 / 文本片段;详情端点返回) */ + private Long parentSourceId; + private LocalDateTime createdAt; +} diff --git a/src/main/java/com/label/module/source/entity/SourceData.java b/src/main/java/com/label/module/source/entity/SourceData.java new file mode 100644 index 0000000..81a342e --- /dev/null +++ b/src/main/java/com/label/module/source/entity/SourceData.java @@ -0,0 +1,56 @@ +package com.label.module.source.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.time.LocalDateTime; + +/** + * 原始资料实体,对应 source_data 表。 + * + * dataType 取值:TEXT / IMAGE / VIDEO + * status 取值:PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED + */ +@Data +@TableName("source_data") +public class SourceData { + + @TableId(type = IdType.AUTO) + private Long id; + + /** 所属公司(多租户键) */ + private Long companyId; + + /** 上传用户 ID */ + private Long uploaderId; + + /** 资料类型:TEXT / IMAGE / VIDEO */ + private String dataType; + + /** RustFS 对象路径 */ + private String filePath; + + /** 原始文件名 */ + private String fileName; + + /** 文件大小(字节) */ + private Long fileSize; + + /** RustFS Bucket 名称 */ + private String bucketName; + + /** 父资料 ID(视频帧或文本片段的自引用外键) */ + private Long parentSourceId; + + /** 流水线状态:PENDING / PREPROCESSING / EXTRACTING / QA_REVIEW / APPROVED */ + private String status; + + /** 保留字段(当前无 REJECTED 状态) */ + private String rejectReason; + + private LocalDateTime createdAt; + + private LocalDateTime updatedAt; +} diff --git a/src/main/java/com/label/module/source/mapper/SourceDataMapper.java b/src/main/java/com/label/module/source/mapper/SourceDataMapper.java new file mode 100644 index 0000000..c6ea424 --- /dev/null +++ b/src/main/java/com/label/module/source/mapper/SourceDataMapper.java @@ -0,0 +1,28 @@ +package com.label.module.source.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.label.module.source.entity.SourceData; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Update; + +/** + * source_data 表 Mapper。 + */ +@Mapper +public interface SourceDataMapper extends BaseMapper { + + /** + * 按 ID 更新资料状态(带 company_id 租户隔离)。 + * + * @param id 资料 ID + * @param status 新状态 + * @param companyId 当前租户 + * @return 影响行数(0 表示记录不存在或不属于当前租户) + */ + @Update("UPDATE source_data SET status = #{status}, updated_at = NOW() " + + "WHERE id = #{id} AND company_id = #{companyId}") + int updateStatus(@Param("id") Long id, + @Param("status") String status, + @Param("companyId") Long companyId); +} diff --git a/src/main/java/com/label/module/source/service/SourceService.java b/src/main/java/com/label/module/source/service/SourceService.java new file mode 100644 index 0000000..4d42546 --- /dev/null +++ b/src/main/java/com/label/module/source/service/SourceService.java @@ -0,0 +1,219 @@ +package com.label.module.source.service; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.label.common.exception.BusinessException; +import com.label.common.result.PageResult; +import com.label.common.shiro.TokenPrincipal; +import com.label.common.storage.RustFsClient; +import com.label.module.source.dto.SourceResponse; +import com.label.module.source.entity.SourceData; +import com.label.module.source.mapper.SourceDataMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.multipart.MultipartFile; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * 原始资料业务服务。 + * + * 上传流程:先 INSERT 获取 ID → 构造 RustFS 路径 → 上传文件 → UPDATE filePath。 + * 删除规则:仅 PENDING 状态可删(防止删除已进入标注流水线的资料)。 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class SourceService { + + private static final Set VALID_DATA_TYPES = Set.of("TEXT", "IMAGE", "VIDEO"); + private static final int PRESIGNED_URL_MINUTES = 15; + + private final SourceDataMapper sourceDataMapper; + private final RustFsClient rustFsClient; + + @Value("${rustfs.bucket:label-source-data}") + private String bucket; + + // ------------------------------------------------------------------ 上传 -- + + /** + * 上传文件并创建 source_data 记录。 + * + * @param file 上传的文件 + * @param dataType 资料类型(TEXT / IMAGE / VIDEO) + * @param principal 当前登录用户 + * @return 创建成功的资料摘要 + */ + @Transactional + public SourceResponse upload(MultipartFile file, String dataType, TokenPrincipal principal) { + if (file == null || file.isEmpty()) { + throw new BusinessException("FILE_EMPTY", "上传文件不能为空", HttpStatus.BAD_REQUEST); + } + if (!VALID_DATA_TYPES.contains(dataType)) { + throw new BusinessException("INVALID_TYPE", "不支持的资料类型: " + dataType, HttpStatus.BAD_REQUEST); + } + + String originalName = file.getOriginalFilename() != null ? file.getOriginalFilename() : "unknown"; + + // 1. 先插入占位记录,拿到自增 ID + SourceData source = new SourceData(); + source.setCompanyId(principal.getCompanyId()); + source.setUploaderId(principal.getUserId()); + source.setDataType(dataType); + source.setFileName(originalName); + source.setFileSize(file.getSize()); + source.setBucketName(bucket); + source.setFilePath(""); // 占位,后面更新 + source.setStatus("PENDING"); + sourceDataMapper.insert(source); + + // 2. 构造 RustFS 对象路径 + String objectKey = String.format("%d/%s/%d/%s", + principal.getCompanyId(), dataType.toLowerCase(), source.getId(), originalName); + + // 3. 上传文件到 RustFS + try { + rustFsClient.upload(bucket, objectKey, file.getInputStream(), + file.getSize(), file.getContentType()); + } catch (IOException e) { + log.error("文件上传到 RustFS 失败: bucket={}, key={}", bucket, objectKey, e); + throw new BusinessException("UPLOAD_FAILED", "文件上传失败,请重试", HttpStatus.INTERNAL_SERVER_ERROR); + } + + // 4. 更新 filePath + sourceDataMapper.update(null, new LambdaUpdateWrapper() + .eq(SourceData::getId, source.getId()) + .set(SourceData::getFilePath, objectKey)); + + log.debug("资料上传成功: id={}, key={}", source.getId(), objectKey); + return toUploadResponse(source, objectKey); + } + + // ------------------------------------------------------------------ 列表 -- + + /** + * 分页查询资料列表。 + * UPLOADER 只见自己上传的资料;ADMIN 见本公司全部资料(多租户自动过滤)。 + */ + public PageResult list(int page, int pageSize, + String dataType, String status, + TokenPrincipal principal) { + pageSize = Math.min(pageSize, 100); + + LambdaQueryWrapper wrapper = new LambdaQueryWrapper() + .orderByDesc(SourceData::getCreatedAt); + + // UPLOADER 只能查自己的资料 + if ("UPLOADER".equals(principal.getRole())) { + wrapper.eq(SourceData::getUploaderId, principal.getUserId()); + } + if (dataType != null && !dataType.isBlank()) { + wrapper.eq(SourceData::getDataType, dataType); + } + if (status != null && !status.isBlank()) { + wrapper.eq(SourceData::getStatus, status); + } + + Page pageResult = sourceDataMapper.selectPage(new Page<>(page, pageSize), wrapper); + + List items = pageResult.getRecords().stream() + .map(this::toListItem) + .collect(Collectors.toList()); + + return PageResult.of(items, pageResult.getTotal(), page, pageSize); + } + + // ------------------------------------------------------------------ 详情 -- + + /** + * 按 ID 查询资料详情,含 15 分钟预签名下载链接。 + */ + public SourceResponse findById(Long id) { + SourceData source = sourceDataMapper.selectById(id); + if (source == null) { + throw new BusinessException("NOT_FOUND", "资料不存在", HttpStatus.NOT_FOUND); + } + + String presignedUrl = null; + if (source.getFilePath() != null && !source.getFilePath().isBlank()) { + presignedUrl = rustFsClient.getPresignedUrl(bucket, source.getFilePath(), PRESIGNED_URL_MINUTES); + } + + return SourceResponse.builder() + .id(source.getId()) + .fileName(source.getFileName()) + .dataType(source.getDataType()) + .fileSize(source.getFileSize()) + .status(source.getStatus()) + .presignedUrl(presignedUrl) + .parentSourceId(source.getParentSourceId()) + .createdAt(source.getCreatedAt()) + .build(); + } + + // ------------------------------------------------------------------ 删除 -- + + /** + * 删除资料:仅 PENDING 状态可删,同步删除 RustFS 文件。 + * + * @throws BusinessException SOURCE_IN_PIPELINE(409) 资料已进入标注流程 + */ + @Transactional + public void delete(Long id, Long companyId) { + SourceData source = sourceDataMapper.selectById(id); + if (source == null) { + throw new BusinessException("NOT_FOUND", "资料不存在", HttpStatus.NOT_FOUND); + } + + if (!"PENDING".equals(source.getStatus())) { + throw new BusinessException("SOURCE_IN_PIPELINE", + "资料已进入标注流程,不可删除(当前状态:" + source.getStatus() + ")", + HttpStatus.CONFLICT); + } + + // 先删 RustFS 文件(幂等,不抛异常) + if (source.getFilePath() != null && !source.getFilePath().isBlank()) { + try { + rustFsClient.delete(bucket, source.getFilePath()); + } catch (Exception e) { + log.warn("RustFS 文件删除失败(继续删 DB 记录): bucket={}, key={}", bucket, source.getFilePath(), e); + } + } + + sourceDataMapper.deleteById(id); + log.debug("资料删除成功: id={}", id); + } + + // ------------------------------------------------------------------ 私有工具 -- + + private SourceResponse toUploadResponse(SourceData source, String filePath) { + return SourceResponse.builder() + .id(source.getId()) + .fileName(source.getFileName()) + .dataType(source.getDataType()) + .fileSize(source.getFileSize()) + .status(source.getStatus()) + .createdAt(source.getCreatedAt()) + .build(); + } + + private SourceResponse toListItem(SourceData source) { + return SourceResponse.builder() + .id(source.getId()) + .fileName(source.getFileName()) + .dataType(source.getDataType()) + .status(source.getStatus()) + .uploaderId(source.getUploaderId()) + .createdAt(source.getCreatedAt()) + .build(); + } +} diff --git a/src/test/java/com/label/integration/SourceIntegrationTest.java b/src/test/java/com/label/integration/SourceIntegrationTest.java new file mode 100644 index 0000000..d8d52fd --- /dev/null +++ b/src/test/java/com/label/integration/SourceIntegrationTest.java @@ -0,0 +1,166 @@ +package com.label.integration; + +import com.label.AbstractIntegrationTest; +import com.label.common.redis.RedisKeyManager; +import com.label.common.redis.RedisService; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.core.io.ByteArrayResource; +import org.springframework.http.*; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * 原始资料管理集成测试(US2)。 + * + * 测试场景: + * 1. UPLOADER 上传文本 → 列表仅返回自己的资料 + * 2. ADMIN 查看列表 → 返回全公司资料 + * 3. 上传视频 → status = PENDING(视频预处理由 Phase 9 处理) + * 4. 已进入流水线的资料删除 → 409 SOURCE_IN_PIPELINE + * + * 注意:本测试不连接真实 RustFS,上传操作会失败并返回 500/503。 + * 测试仅验证可访问的业务逻辑(权限、状态机)。 + * 如需覆盖文件上传,需在测试环境配置 Mock RustFsClient 或启动 MinIO 容器。 + */ +public class SourceIntegrationTest extends AbstractIntegrationTest { + + private static final String UPLOADER_TOKEN = "test-uploader-token-source"; + private static final String UPLOADER2_TOKEN = "test-uploader2-token-source"; + private static final String ADMIN_TOKEN = "test-admin-token-source"; + + @Autowired + private TestRestTemplate restTemplate; + + @Autowired + private RedisService redisService; + + @BeforeEach + void setupTokens() { + // uploader01 token (userId=4 from init.sql seed) + redisService.hSetAll(RedisKeyManager.tokenKey(UPLOADER_TOKEN), + Map.of("userId", "4", "role", "UPLOADER", "companyId", "1", "username", "uploader01"), + 3600L); + // admin token (userId=1 from init.sql seed) + redisService.hSetAll(RedisKeyManager.tokenKey(ADMIN_TOKEN), + Map.of("userId", "1", "role", "ADMIN", "companyId", "1", "username", "admin"), + 3600L); + } + + @AfterEach + void cleanupTokens() { + redisService.delete(RedisKeyManager.tokenKey(UPLOADER_TOKEN)); + redisService.delete(RedisKeyManager.tokenKey(UPLOADER2_TOKEN)); + redisService.delete(RedisKeyManager.tokenKey(ADMIN_TOKEN)); + } + + // ------------------------------------------------------------------ 权限测试 -- + + @Test + @DisplayName("无 Token 访问上传接口 → 401") + void upload_withoutToken_returns401() { + ResponseEntity response = restTemplate.postForEntity( + baseUrl("/api/source/upload"), null, String.class); + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.UNAUTHORIZED); + } + + @Test + @DisplayName("UPLOADER 访问列表接口(无数据)→ 200,items 为空") + void list_uploaderWithNoData_returnsEmptyList() { + ResponseEntity response = restTemplate.exchange( + baseUrl("/api/source/list"), + HttpMethod.GET, + bearerRequest(UPLOADER_TOKEN), + Map.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + + @SuppressWarnings("unchecked") + Map data = (Map) response.getBody().get("data"); + assertThat(data.get("items")).isInstanceOf(List.class); + assertThat(((List) data.get("items"))).isEmpty(); + assertThat(((Number) data.get("total")).longValue()).isEqualTo(0L); + } + + @Test + @DisplayName("ADMIN 访问列表接口(无数据)→ 200,items 为空") + void list_adminWithNoData_returnsEmptyList() { + ResponseEntity response = restTemplate.exchange( + baseUrl("/api/source/list"), + HttpMethod.GET, + bearerRequest(ADMIN_TOKEN), + Map.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + + @SuppressWarnings("unchecked") + Map data = (Map) response.getBody().get("data"); + assertThat(((List) data.get("items"))).isEmpty(); + } + + @Test + @DisplayName("删除不存在的资料 → 404") + void delete_nonExistentSource_returns404() { + ResponseEntity response = restTemplate.exchange( + baseUrl("/api/source/9999"), + HttpMethod.DELETE, + bearerRequest(ADMIN_TOKEN), + Map.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); + } + + @Test + @DisplayName("非 ADMIN 删除资料 → 403 Forbidden") + void delete_byUploader_returns403() { + ResponseEntity response = restTemplate.exchange( + baseUrl("/api/source/9999"), + HttpMethod.DELETE, + bearerRequest(UPLOADER_TOKEN), + Map.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.FORBIDDEN); + } + + @Test + @DisplayName("ADMIN 删除已进入流水线的资料 → 409 SOURCE_IN_PIPELINE") + void delete_sourceInPipeline_returns409() { + // 直接向 DB 插入一条 EXTRACTING 状态的资料(模拟已进入流水线) + jdbcTemplate.execute( + "INSERT INTO source_data (company_id, uploader_id, data_type, file_path, " + + "file_name, file_size, bucket_name, status) " + + "VALUES (1, 1, 'TEXT', 'test/path/file.txt', 'file.txt', 100, 'test-bucket', 'EXTRACTING')"); + + Long sourceId = jdbcTemplate.queryForObject( + "SELECT id FROM source_data WHERE status='EXTRACTING' LIMIT 1", Long.class); + + assertThat(sourceId).isNotNull(); + + ResponseEntity response = restTemplate.exchange( + baseUrl("/api/source/" + sourceId), + HttpMethod.DELETE, + bearerRequest(ADMIN_TOKEN), + Map.class); + + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.CONFLICT); + + @SuppressWarnings("unchecked") + Map body = response.getBody(); + assertThat(body.get("code")).isEqualTo("SOURCE_IN_PIPELINE"); + } + + // ------------------------------------------------------------------ 工具方法 -- + + private HttpEntity bearerRequest(String token) { + HttpHeaders headers = new HttpHeaders(); + headers.set("Authorization", "Bearer " + token); + return new HttpEntity<>(headers); + } +}