Files
label_ai_service/specs/001-ai-service-requirements/tasks.md
wh 4162d9f4e6 docs: add speckit tasks breakdown for 001-ai-service-requirements
55 tasks across 8 phases, organized by 8 user stories (US1-US8).
TDD order: tests first → models → services → routers per story.
Includes parallel execution guide and incremental delivery strategy.
2026-04-10 15:05:02 +08:00

24 KiB
Raw Blame History

Tasks: AI 服务(知识图谱标注平台 AI 计算服务)

Input: Design documents from /specs/001-ai-service-requirements/
Prerequisites: plan.md , spec.md , research.md , data-model.md , contracts/api.md
Tests: Included — spec and plan explicitly mandate TDD全量 TDD 开发)

Organization: Tasks grouped by user story. Each phase is independently implementable and testable.

Format: [ID] [P?] [Story?] Description

  • [P]: Can run in parallel (different files, no shared dependencies)
  • [Story]: Which user story this task belongs to (US1US8)
  • All paths are relative to project root label_ai_service/

Phase 1: Setup项目初始化

Purpose: Create project skeleton and configuration files before any code is written.

  • T001 Create directory structure: app/core/, app/clients/llm/, app/clients/storage/, app/services/, app/routers/, app/models/, tests/
  • T002 Create requirements.txt with pinned dependencies: fastapi≥0.111, uvicorn[standard]≥0.29, pydantic≥2.7, zhipuai≥2.1, boto3≥1.34, pdfplumber≥0.11, python-docx≥1.1, opencv-python-headless≥4.9, numpy≥1.26, httpx≥0.27, python-dotenv≥1.0, pyyaml≥6.0, pytest≥8.0, pytest-asyncio≥0.23
  • T003 [P] Create config.yaml with default server/storage/video/models configuration (port 8000, buckets, max_file_size_mb 200, glm-4-flash / glm-4v-flash)
  • T004 [P] Create .env template with required env var keys (ZHIPUAI_API_KEY, STORAGE_ACCESS_KEY, STORAGE_SECRET_KEY, STORAGE_ENDPOINT, BACKEND_CALLBACK_URL, LOG_LEVEL, MAX_VIDEO_SIZE_MB)
  • T005 [P] Create Dockerfile (python:3.12-slim base, install requirements, expose 8000, CMD uvicorn)
  • T006 [P] Create docker-compose.yml with ai-service and rustfs services, env_file, healthcheck (curl /health every 30s)

Phase 2: Foundational核心基础设施

Purpose: Core infrastructure that MUST be complete before ANY user story can be implemented.

⚠️ CRITICAL: No user story work can begin until this phase is complete.

Config & Core Utilities

  • T007 Implement app/core/config.py: load config.yaml with PyYAML + override via _ENV_OVERRIDES dict mapping env vars to nested YAML paths (including MAX_VIDEO_SIZE_MB → video.max_file_size_mb), expose get_config() with @lru_cache
  • T008 [P] Implement app/core/logging.py: JSON structured logging via logging module, RequestLoggingMiddleware that logs path/status/latency, helper get_logger(name)
  • T009 [P] Implement app/core/exceptions.py: custom exception classes UnsupportedFileTypeError(400), VideoTooLargeError(400), StorageError(502), LLMParseError(502), LLMCallError(503), plus global exception handler that returns {"code": ..., "message": ...} JSON
  • T010 [P] Implement app/core/json_utils.py: extract_json(text) -> dict that strips Markdown code fences (```json ... ```) before json.loads, raises LLMParseError on invalid JSON
  • T011 Write tests/test_config.py: verify YAML defaults load correctly; verify MAX_VIDEO_SIZE_MB=500 env var overrides video.max_file_size_mb; verify missing required env vars surface clear errors

LLM Client大模型适配层

  • T012 [P] Implement app/clients/llm/base.py: LLMClient ABC with abstract methods chat(model, messages) -> str and chat_vision(model, messages) -> str
  • T013 Implement app/clients/llm/zhipuai_client.py: ZhipuAIClient(LLMClient) that wraps synchronous ZhipuAI SDK calls via asyncio.get_event_loop().run_in_executor(None, ...) in a thread pool; raise LLMCallError on SDK exceptions
  • T014 [P] Write tests/test_llm_client.py: mock ZhipuAI SDK to verify chat() and chat_vision() call the SDK correctly; verify LLMCallError is raised on SDK exception; verify thread-pool wrapping does not block the event loop

Storage Client存储适配层

  • T015 [P] Implement app/clients/storage/base.py: StorageClient ABC with abstract methods download_bytes(bucket, path) -> bytes, upload_bytes(bucket, path, data, content_type) -> None, get_presigned_url(bucket, path, expires) -> str, get_object_size(bucket, path) -> int
  • T016 Implement app/clients/storage/rustfs_client.py: RustFSClient(StorageClient) using boto3 S3 client; all calls wrapped via run_in_executor; get_object_size uses head_object; raise StorageError on ClientError
  • T017 [P] Write tests/test_storage_client.py: mock boto3 S3 client; verify download_bytes returns correct bytes; verify get_object_size calls head_object and returns ContentLength; verify StorageError raised on S3 exception

FastAPI Application Entry

  • T018 Implement app/main.py: create FastAPI app with lifespan, register RequestLoggingMiddleware, register global exception handlers from exceptions.py, mount all routers (empty stubs initially), expose GET /health → {"status": "ok"}
  • T019 [P] Implement app/core/dependencies.py: get_llm_client() -> LLMClient and get_storage_client() -> StorageClient as @lru_cache singletons, instantiated from get_config() values
  • T020 Write tests/conftest.py: mock_llm fixture (AsyncMock implementing LLMClient), mock_storage fixture (AsyncMock implementing StorageClient with get_object_size returning 10MB), test_app fixture overriding Depends, client fixture using TestClient

Checkpoint: Foundation complete — all user story phases can now begin in parallel.


Phase 3: User Story 1 — ADMIN 从文档中提取知识三元组 (Priority: P1) 🎯 MVP

Goal: POST /api/v1/text/extract reads a TXT/PDF/DOCX file from RustFS, calls GLM, returns structured triples with source offsets.

Independent Test: Send {"file_path": "text/test.txt", "file_name": "test.txt"} to the endpoint; verify response contains items with subject, predicate, object, source_snippet, source_offset.start/end.

Tests for User Story 1 ⚠️ Write FIRST — verify FAIL before implementing

  • T021 [P] [US1] Write tests/test_text_service.py: test TXT parsing returns triples; test PDF parsing (mock pdfplumber); test DOCX parsing (mock python-docx); test unsupported format raises UnsupportedFileTypeError; test storage failure raises StorageError; test LLM parse error raises LLMParseError

Implementation for User Story 1

  • T022 [P] [US1] Create app/models/text_models.py: SourceOffset(start: int, end: int), TripleItem(subject, predicate, object, source_snippet, source_offset), TextExtractRequest(file_path, file_name, model?, prompt_template?), TextExtractResponse(items: list[TripleItem])
  • T023 [US1] Implement app/services/text_service.py: extract_triples(req, llm, storage) -> TextExtractResponse; dispatch to _parse_txt / _parse_pdf / _parse_docx by file extension; build prompt from content + optional prompt_template; call llm.chat(); parse JSON response via extract_json(); validate triple fields; raise typed exceptions
  • T024 [US1] Write tests/test_text_router.py: POST /api/v1/text/extract returns 200 with items; unsupported format returns 400 with UNSUPPORTED_FILE_TYPE; storage error returns 502 with STORAGE_ERROR; LLM parse error returns 502 with LLM_PARSE_ERROR
  • T025 [US1] Implement app/routers/text.py: APIRouter(prefix="/api/v1") with POST /text/extract handler that injects storage and llm via Depends, calls text_service.extract_triples(); register router in app/main.py

Checkpoint: POST /api/v1/text/extract fully functional. Run pytest tests/test_text_service.py tests/test_text_router.py -v — all green.


Phase 4: User Story 2 — ADMIN 从图片中提取知识四元组并自动裁剪 (Priority: P1)

Goal: POST /api/v1/image/extract downloads an image from RustFS, calls GLM-4V, crops bbox regions, uploads crops, returns quads with cropped_image_path.

Independent Test: Send {"file_path": "image/test.jpg", "task_id": 1} to the endpoint; verify response contains items each with bbox, qualifier, and cropped_image_path matching pattern crops/1/{n}.jpg.

Tests for User Story 2 ⚠️ Write FIRST — verify FAIL before implementing

  • T026 [P] [US2] Write tests/test_image_service.py: test full quad extraction pipeline with mock LLM returning valid JSON; test bbox crop uses correct pixel coordinates; test out-of-bounds bbox is clamped to image dimensions; test crop upload path follows crops/{task_id}/{index}.jpg convention; test LLM parse error raises LLMParseError

Implementation for User Story 2

  • T027 [P] [US2] Create app/models/image_models.py: BBox(x, y, w, h: int), QuadrupleItem(subject, predicate, object, qualifier?, bbox, cropped_image_path), ImageExtractRequest(file_path, task_id, model?, prompt_template?), ImageExtractResponse(items: list[QuadrupleItem])
  • T028 [US2] Implement app/services/image_service.py: extract_quads(req, llm, storage) -> ImageExtractResponse; download image bytes → decode with OpenCV (cv2.imdecode); base64 encode image for GLM-4V multimodal message; call llm.chat_vision(); parse JSON via extract_json(); for each quad, clamp bbox to image dimensions, crop with numpy slicing, encode as JPEG, upload to crops/{task_id}/{index}.jpg; return quads with paths
  • T029 [US2] Write tests/test_image_router.py: POST /api/v1/image/extract returns 200 with items; LLM parse error returns 502; storage download failure returns 502
  • T030 [US2] Implement app/routers/image.py: POST /image/extract handler; register in app/main.py

Checkpoint: POST /api/v1/image/extract fully functional. Run pytest tests/test_image_service.py tests/test_image_router.py -v — all green.


Phase 5: User Stories 3 & 4 — 视频帧提取 + 视频转文本 (Priority: P2)

Goal: POST /api/v1/video/extract-frames and POST /api/v1/video/to-text immediately return 202, process video in background via FastAPI BackgroundTasks, then POST callback to Java backend with results.

Independent Test (US3): Send extract-frames request; verify immediate 202 with job_id; mock storage and callback URL; verify callback received with status=SUCCESS and non-empty frames list.

Independent Test (US4): Send to-text request with start_sec=0, end_sec=10; verify immediate 202; verify callback received with status=SUCCESS and output_path pointing to an uploaded text file.

Tests for User Stories 3 & 4 ⚠️ Write FIRST — verify FAIL before implementing

  • T031 [P] [US3] Write tests/test_video_service.py (frame extraction tests): generate small test video via cv2.VideoWriter; test interval mode extracts correct frame indices; test keyframe mode only extracts frames exceeding difference threshold; test each extracted frame is uploaded to frames/{source_id}/{index}.jpg; test failed extraction triggers FAILED callback with error_message
  • T032 [P] [US4] Append to tests/test_video_service.py (to-text tests): test uniform sampling selects frame_sample_count frames from [start_sec, end_sec] window; test sampled frames are passed as base64 to llm.chat_vision(); test output text is uploaded to video-text/{source_id}/{timestamp}.txt; test LLM failure triggers FAILED callback

Implementation for User Stories 3 & 4

  • T033 [US3] Create app/models/video_models.py: ExtractFramesRequest(file_path, source_id, job_id, mode="interval", frame_interval=30), VideoToTextRequest(file_path, source_id, job_id, start_sec, end_sec, model?, prompt_template?), FrameInfo(frame_index, time_sec, frame_path), VideoJobCallback(job_id, status, frames?, output_path?, error_message?), VideoAcceptedResponse(message, job_id)
  • T034 [US3] Implement frame extraction in app/services/video_service.py: extract_frames_task(req, llm, storage, callback_url) background function; download video to temp file; open with cv2.VideoCapture; interval mode: step by frame_interval; keyframe mode: compute grayscale frame diff, extract when diff > threshold (default 30.0); upload each frame JPEG; POST callback with FrameInfo list; clean up temp file; catch all exceptions and POST FAILED callback
  • T035 [US4] Implement to-text in app/services/video_service.py: video_to_text_task(req, llm, storage, callback_url) background function; download video to temp file; sample frame_sample_count frames uniformly within [start_sec, end_sec]; base64 encode frames; call llm.chat_vision() with all frames in one multimodal message; upload text result to video-text/{source_id}/{timestamp}.txt; POST callback with output_path; clean up temp file
  • T036 [US3] Write tests/test_video_router.py: POST /api/v1/video/extract-frames returns 202 immediately; video exceeding max_file_size_mb returns 400 with VIDEO_TOO_LARGE; background task is registered (mock BackgroundTasks)
  • T037 [US4] Append to tests/test_video_router.py: POST /api/v1/video/to-text returns 202; size limit applies equally
  • T038 [US3] Implement app/routers/video.py: _check_video_size(storage, bucket, file_path, max_mb) helper that calls storage.get_object_size() and raises VideoTooLargeError; POST /video/extract-frames and POST /video/to-text handlers check size then enqueue background task; register router in app/main.py

Checkpoint: Both video endpoints fully functional. Run pytest tests/test_video_service.py tests/test_video_router.py -v — all green.


Phase 6: User Stories 5 & 6 — 文本QA生成 + 图像QA生成 (Priority: P2)

Goal: POST /api/v1/qa/gen-text generates QA pairs from text triples; POST /api/v1/qa/gen-image generates multimodal QA pairs from image quads (images fetched and base64-encoded internally).

Independent Test (US5): Send {"items": [{"subject":"变压器","predicate":"额定电压","object":"110kV","source_snippet":"..."}]} to gen-text; verify response contains pairs with non-empty question and answer.

Independent Test (US6): Send {"items": [{"subject":"...","cropped_image_path":"crops/1/0.jpg",...}]} to gen-image; verify response contains pairs with image_path matching crops/1/0.jpg.

Tests for User Stories 5 & 6 ⚠️ Write FIRST — verify FAIL before implementing

  • T039 [P] [US5] Write tests/test_qa_service.py (text QA tests): test triples are formatted into prompt correctly; test LLM response JSON is parsed into QAPair list; test LLMParseError on malformed LLM response; test LLMCallError propagates correctly
  • T040 [P] [US6] Append to tests/test_qa_service.py (image QA tests): test storage downloads cropped image and encodes as base64 before LLM call; test multimodal message includes both text (quad info) and inline image data URI; test StorageError on failed image download

Implementation for User Stories 5 & 6

  • T041 [P] [US5] Create app/models/qa_models.py: TextQAItem(subject, predicate, object, source_snippet), GenTextQARequest(items, model?, prompt_template?), QAPair(question, answer), ImageQAItem(subject, predicate, object, qualifier?, cropped_image_path), GenImageQARequest(items, model?, prompt_template?), ImageQAPair(question, answer, image_path), TextQAResponse(pairs), ImageQAResponse(pairs)
  • T042 [US5] Implement gen_text_qa(req, llm) -> TextQAResponse in app/services/qa_service.py: format all triples + source snippets into a single batch prompt; call llm.chat(); parse JSON array via extract_json(); return QAPair list
  • T043 [US6] Implement gen_image_qa(req, llm, storage) -> ImageQAResponse in app/services/qa_service.py: for each ImageQAItem, download cropped_image_path bytes from source-data bucket; base64 encode; build multimodal message with quad text + data:image/jpeg;base64,... inline URL; call llm.chat_vision(); parse JSON; return ImageQAPair with image_path = item.cropped_image_path
  • T044 [US5] Write tests/test_qa_router.py: POST /api/v1/qa/gen-text returns 200 with pairs; POST /api/v1/qa/gen-image returns 200 with pairs including image_path; LLM errors return 502/503
  • T045 [US5] Implement app/routers/qa.py: POST /qa/gen-text and POST /qa/gen-image handlers; register router in app/main.py

Checkpoint: Both QA endpoints fully functional. Run pytest tests/test_qa_service.py tests/test_qa_router.py -v — all green.


Phase 7: User Stories 7 & 8 — 微调任务管理 + 健康检查 (Priority: P3)

Goal: POST /api/v1/finetune/start submits a ZhipuAI fine-tune job; GET /api/v1/finetune/status/{jobId} queries its state; GET /health returns service liveness.

Independent Test (US7): Call POST /finetune/start with mock LLM returning a job ID; then call GET /finetune/status/{jobId}; verify status is one of RUNNING/SUCCESS/FAILED and progress is an integer.

Independent Test (US8): GET /health returns {"status": "ok"} with HTTP 200 in under 1 second.

Tests for User Stories 7 & 8 ⚠️ Write FIRST — verify FAIL before implementing

  • T046 [P] [US7] Write tests/test_finetune_service.py: test submit_finetune() calls ZhipuAI finetune API with correct params and returns job_id; test get_status() maps ZhipuAI "running"→RUNNING, "succeeded"→SUCCESS, "failed"→FAILED, unknown status→RUNNING (conservative); test LLMCallError on SDK failure
  • T047 [P] [US8] Write health check test in tests/test_finetune_router.py (or new tests/test_health.py): GET /health returns 200 with {"status": "ok"}

Implementation for User Stories 7 & 8

  • T048 [P] [US7] Create app/models/finetune_models.py: FinetuneStartRequest(jsonl_url, base_model, hyperparams?), FinetuneStartResponse(job_id), FinetuneStatusResponse(job_id, status, progress?, error_message?)
  • T049 [US7] Implement app/services/finetune_service.py: submit_finetune(req, llm) -> FinetuneStartResponse calls ZhipuAI fine-tune create API via run_in_executor; get_finetune_status(job_id, llm) -> FinetuneStatusResponse calls ZhipuAI fine-tune retrieve API and maps status strings; raise LLMCallError on failure
  • T050 [US7] Write tests/test_finetune_router.py: POST /api/v1/finetune/start returns 200 with job_id; GET /api/v1/finetune/status/{jobId} returns 200 with status fields; unknown job_id propagates error response
  • T051 [US7] Implement app/routers/finetune.py: POST /finetune/start and GET /finetune/status/{job_id} handlers; register router in app/main.py

Checkpoint: All 8 user stories complete. Run pytest tests/ -v — all green.


Phase 8: Polish & Cross-Cutting Concerns

Purpose: Final integration, documentation verification, and deployment readiness.

  • T052 [P] Create .gitignore for Python project (.env, __pycache__/, *.pyc, .pytest_cache/, tmp/ for video temp files)
  • T053 Run full test suite conda run -n label pytest tests/ -v --cov=app --cov-report=term-missing and fix any remaining failures or coverage gaps
  • T054 [P] Verify Swagger/OpenAPI docs at http://localhost:8000/docs show all 9 endpoints with correct request/response schemas
  • T055 Validate quickstart.md end-to-end: conda activate label && pip install -r requirements.txt && conda run -n label uvicorn app.main:app --reload starts cleanly; GET /health returns 200; docker-compose up -d builds and healthcheck passes

Dependencies & Execution Order

Phase Dependencies

Phase 1 (Setup)
    └─→ Phase 2 (Foundational) ← BLOCKS everything
            ├─→ Phase 3 (US1, P1) ─┐
            ├─→ Phase 4 (US2, P1) ─┤ Can run in parallel after Phase 2
            ├─→ Phase 5 (US3+4, P2)─┤
            ├─→ Phase 6 (US5+6, P2)─┤
            └─→ Phase 7 (US7+8, P3)─┘
                    └─→ Phase 8 (Polish)

User Story Dependencies

Story Priority Depends On Blocking
US1 (文本三元组) P1 Phase 2 only Nothing
US2 (图像四元组) P1 Phase 2 only US6 (shares image downloading pattern)
US3 (视频帧提取) P2 Phase 2 only Nothing
US4 (视频转文本) P2 Phase 2, US3 (shares video_service.py) Nothing
US5 (文本QA) P2 Phase 2 only Nothing
US6 (图像QA) P2 Phase 2 only Nothing
US7 (微调管理) P3 Phase 2 only Nothing
US8 (健康检查) P3 T018 (main.py) Nothing

Within Each User Story

  1. Tests MUST be written first and verified to FAIL before implementation
  2. Models → Services → Routers (in dependency order)
  3. Register router in main.py after router file is complete
  4. Run story-specific tests before marking story done

Parallel Opportunities

All tasks marked [P] within a phase can run concurrently (different files):

  • Phase 2: T008, T009, T010 (core utilities) + T012, T014 (LLM) + T015, T017 (Storage) + T019 (dependencies)
  • Phase 3: T021 (tests) and T022 (models) can start together
  • Phase 4: T026 (tests) and T027 (models) can start together
  • Phase 5: T031 (US3 tests) and T032 (US4 tests) can start together
  • Phase 6: T039 (US5 tests) and T040, T041 (US6 tests + models) can start together
  • Phase 7: T046, T047, T048 can start together

Parallel Example: Phase 2 Foundational

# Kick off these in parallel (all different files):
[T008] app/core/logging.py
[T009] app/core/exceptions.py
[T010] app/core/json_utils.py
[T012] app/clients/llm/base.py
[T014] tests/test_llm_client.py
[T015] app/clients/storage/base.py
[T017] tests/test_storage_client.py
[T019] app/core/dependencies.py

# Then in sequence (each depends on previous):
[T007] app/core/config.py  →  [T011] tests/test_config.py
[T013] app/clients/llm/zhipuai_client.py (needs T012)
[T016] app/clients/storage/rustfs_client.py (needs T015)
[T018] app/main.py (needs T009, T008)
[T020] tests/conftest.py (needs T018, T013, T016)

Implementation Strategy

MVP First (US1 + US2 — P1 Stories Only)

  1. Complete Phase 1: Setup
  2. Complete Phase 2: Foundational (CRITICAL — blocks all stories)
  3. Complete Phase 3: US1 (文本三元组提取) → validate independently
  4. Complete Phase 4: US2 (图像四元组提取) → validate independently
  5. STOP and DEMO: Core extraction pipeline is production-ready

Incremental Delivery

Phase 1+2 complete  →  Foundation ready (commit)
Phase 3 complete    →  Text extraction works  (commit, demo)
Phase 4 complete    →  Image extraction works (commit, demo)
Phase 5 complete    →  Video processing works (commit, demo)
Phase 6 complete    →  QA generation works   (commit, demo)
Phase 7 complete    →  Fine-tune management  (commit, demo)
Phase 8 complete    →  Production-ready      (tag release)

Parallel Team Strategy

With two developers after Phase 2 completes:

  • Dev A: US1 (text) → US5 (text QA) → US7 (finetune)
  • Dev B: US2 (image) → US6 (image QA) → US3+US4 (video)

Summary

Phase Tasks User Story Priority
Phase 1: Setup T001T006 (6)
Phase 2: Foundational T007T020 (14)
Phase 3 T021T025 (5) US1 文本三元组 P1 🎯 MVP
Phase 4 T026T030 (5) US2 图像四元组 P1
Phase 5 T031T038 (8) US3+US4 视频处理 P2
Phase 6 T039T045 (7) US5+US6 QA生成 P2
Phase 7 T046T051 (6) US7+US8 微调+健康检查 P3
Phase 8: Polish T052T055 (4)
Total 55 tasks 8 user stories

Notes

  • [P] tasks = different files, no shared dependencies within the same phase
  • [US?] label maps each task to its user story for traceability
  • Tests in tests/conftest.py (T020) use AsyncMock — no real ZhipuAI or RustFS calls in unit tests
  • Video tasks use a real small video file generated by cv2.VideoWriter in tests — no external media needed
  • All config is loaded via get_config() — never hardcode model names or bucket names in services
  • Commit after each phase checkpoint at minimum; commit after each task for clean git history
  • Stop at any checkpoint to validate the story independently before proceeding