55 tasks across 8 phases, organized by 8 user stories (US1-US8). TDD order: tests first → models → services → routers per story. Includes parallel execution guide and incremental delivery strategy.
24 KiB
Tasks: AI 服务(知识图谱标注平台 AI 计算服务)
Input: Design documents from /specs/001-ai-service-requirements/
Prerequisites: plan.md ✅, spec.md ✅, research.md ✅, data-model.md ✅, contracts/api.md ✅
Tests: Included — spec and plan explicitly mandate TDD(全量 TDD 开发)
Organization: Tasks grouped by user story. Each phase is independently implementable and testable.
Format: [ID] [P?] [Story?] Description
- [P]: Can run in parallel (different files, no shared dependencies)
- [Story]: Which user story this task belongs to (US1–US8)
- All paths are relative to project root
label_ai_service/
Phase 1: Setup(项目初始化)
Purpose: Create project skeleton and configuration files before any code is written.
- T001 Create directory structure:
app/core/,app/clients/llm/,app/clients/storage/,app/services/,app/routers/,app/models/,tests/ - T002 Create
requirements.txtwith pinned dependencies: fastapi≥0.111, uvicorn[standard]≥0.29, pydantic≥2.7, zhipuai≥2.1, boto3≥1.34, pdfplumber≥0.11, python-docx≥1.1, opencv-python-headless≥4.9, numpy≥1.26, httpx≥0.27, python-dotenv≥1.0, pyyaml≥6.0, pytest≥8.0, pytest-asyncio≥0.23 - T003 [P] Create
config.yamlwith default server/storage/video/models configuration (port 8000, buckets, max_file_size_mb 200, glm-4-flash / glm-4v-flash) - T004 [P] Create
.envtemplate with required env var keys (ZHIPUAI_API_KEY, STORAGE_ACCESS_KEY, STORAGE_SECRET_KEY, STORAGE_ENDPOINT, BACKEND_CALLBACK_URL, LOG_LEVEL, MAX_VIDEO_SIZE_MB) - T005 [P] Create
Dockerfile(python:3.12-slim base, install requirements, expose 8000, CMD uvicorn) - T006 [P] Create
docker-compose.ymlwith ai-service and rustfs services, env_file, healthcheck (curl /health every 30s)
Phase 2: Foundational(核心基础设施)
Purpose: Core infrastructure that MUST be complete before ANY user story can be implemented.
⚠️ CRITICAL: No user story work can begin until this phase is complete.
Config & Core Utilities
- T007 Implement
app/core/config.py: loadconfig.yamlwith PyYAML + override via_ENV_OVERRIDESdict mapping env vars to nested YAML paths (includingMAX_VIDEO_SIZE_MB → video.max_file_size_mb), exposeget_config()with@lru_cache - T008 [P] Implement
app/core/logging.py: JSON structured logging vialoggingmodule,RequestLoggingMiddlewarethat logs path/status/latency, helperget_logger(name) - T009 [P] Implement
app/core/exceptions.py: custom exception classesUnsupportedFileTypeError(400),VideoTooLargeError(400),StorageError(502),LLMParseError(502),LLMCallError(503), plus global exception handler that returns{"code": ..., "message": ...}JSON - T010 [P] Implement
app/core/json_utils.py:extract_json(text) -> dictthat strips Markdown code fences (```json ... ```) beforejson.loads, raisesLLMParseErroron invalid JSON - T011 Write
tests/test_config.py: verify YAML defaults load correctly; verifyMAX_VIDEO_SIZE_MB=500env var overridesvideo.max_file_size_mb; verify missing required env vars surface clear errors
LLM Client(大模型适配层)
- T012 [P] Implement
app/clients/llm/base.py:LLMClientABC with abstract methodschat(model, messages) -> strandchat_vision(model, messages) -> str - T013 Implement
app/clients/llm/zhipuai_client.py:ZhipuAIClient(LLMClient)that wraps synchronous ZhipuAI SDK calls viaasyncio.get_event_loop().run_in_executor(None, ...)in a thread pool; raiseLLMCallErroron SDK exceptions - T014 [P] Write
tests/test_llm_client.py: mock ZhipuAI SDK to verifychat()andchat_vision()call the SDK correctly; verifyLLMCallErroris raised on SDK exception; verify thread-pool wrapping does not block the event loop
Storage Client(存储适配层)
- T015 [P] Implement
app/clients/storage/base.py:StorageClientABC with abstract methodsdownload_bytes(bucket, path) -> bytes,upload_bytes(bucket, path, data, content_type) -> None,get_presigned_url(bucket, path, expires) -> str,get_object_size(bucket, path) -> int - T016 Implement
app/clients/storage/rustfs_client.py:RustFSClient(StorageClient)using boto3 S3 client; all calls wrapped viarun_in_executor;get_object_sizeuseshead_object; raiseStorageErroronClientError - T017 [P] Write
tests/test_storage_client.py: mock boto3 S3 client; verifydownload_bytesreturns correct bytes; verifyget_object_sizecallshead_objectand returnsContentLength; verifyStorageErrorraised on S3 exception
FastAPI Application Entry
- T018 Implement
app/main.py: create FastAPI app with lifespan, registerRequestLoggingMiddleware, register global exception handlers fromexceptions.py, mount all routers (empty stubs initially), exposeGET /health → {"status": "ok"} - T019 [P] Implement
app/core/dependencies.py:get_llm_client() -> LLMClientandget_storage_client() -> StorageClientas@lru_cachesingletons, instantiated fromget_config()values - T020 Write
tests/conftest.py:mock_llmfixture (AsyncMock implementing LLMClient),mock_storagefixture (AsyncMock implementing StorageClient withget_object_sizereturning 10MB),test_appfixture overriding Depends,clientfixture usingTestClient
Checkpoint: Foundation complete — all user story phases can now begin in parallel.
Phase 3: User Story 1 — ADMIN 从文档中提取知识三元组 (Priority: P1) 🎯 MVP
Goal: POST /api/v1/text/extract reads a TXT/PDF/DOCX file from RustFS, calls GLM, returns structured triples with source offsets.
Independent Test: Send {"file_path": "text/test.txt", "file_name": "test.txt"} to the endpoint; verify response contains items with subject, predicate, object, source_snippet, source_offset.start/end.
Tests for User Story 1 ⚠️ Write FIRST — verify FAIL before implementing
- T021 [P] [US1] Write
tests/test_text_service.py: test TXT parsing returns triples; test PDF parsing (mock pdfplumber); test DOCX parsing (mock python-docx); test unsupported format raisesUnsupportedFileTypeError; test storage failure raisesStorageError; test LLM parse error raisesLLMParseError
Implementation for User Story 1
- T022 [P] [US1] Create
app/models/text_models.py:SourceOffset(start: int, end: int),TripleItem(subject, predicate, object, source_snippet, source_offset),TextExtractRequest(file_path, file_name, model?, prompt_template?),TextExtractResponse(items: list[TripleItem]) - T023 [US1] Implement
app/services/text_service.py:extract_triples(req, llm, storage) -> TextExtractResponse; dispatch to_parse_txt / _parse_pdf / _parse_docxby file extension; build prompt from content + optionalprompt_template; callllm.chat(); parse JSON response viaextract_json(); validate triple fields; raise typed exceptions - T024 [US1] Write
tests/test_text_router.py: POST/api/v1/text/extractreturns 200 with items; unsupported format returns 400 withUNSUPPORTED_FILE_TYPE; storage error returns 502 withSTORAGE_ERROR; LLM parse error returns 502 withLLM_PARSE_ERROR - T025 [US1] Implement
app/routers/text.py:APIRouter(prefix="/api/v1")withPOST /text/extracthandler that injectsstorageandllmvia Depends, callstext_service.extract_triples(); register router inapp/main.py
Checkpoint: POST /api/v1/text/extract fully functional. Run pytest tests/test_text_service.py tests/test_text_router.py -v — all green.
Phase 4: User Story 2 — ADMIN 从图片中提取知识四元组并自动裁剪 (Priority: P1)
Goal: POST /api/v1/image/extract downloads an image from RustFS, calls GLM-4V, crops bbox regions, uploads crops, returns quads with cropped_image_path.
Independent Test: Send {"file_path": "image/test.jpg", "task_id": 1} to the endpoint; verify response contains items each with bbox, qualifier, and cropped_image_path matching pattern crops/1/{n}.jpg.
Tests for User Story 2 ⚠️ Write FIRST — verify FAIL before implementing
- T026 [P] [US2] Write
tests/test_image_service.py: test full quad extraction pipeline with mock LLM returning valid JSON; test bbox crop uses correct pixel coordinates; test out-of-bounds bbox is clamped to image dimensions; test crop upload path followscrops/{task_id}/{index}.jpgconvention; test LLM parse error raisesLLMParseError
Implementation for User Story 2
- T027 [P] [US2] Create
app/models/image_models.py:BBox(x, y, w, h: int),QuadrupleItem(subject, predicate, object, qualifier?, bbox, cropped_image_path),ImageExtractRequest(file_path, task_id, model?, prompt_template?),ImageExtractResponse(items: list[QuadrupleItem]) - T028 [US2] Implement
app/services/image_service.py:extract_quads(req, llm, storage) -> ImageExtractResponse; download image bytes → decode with OpenCV (cv2.imdecode); base64 encode image for GLM-4V multimodal message; callllm.chat_vision(); parse JSON viaextract_json(); for each quad, clamp bbox to image dimensions, crop with numpy slicing, encode as JPEG, upload tocrops/{task_id}/{index}.jpg; return quads with paths - T029 [US2] Write
tests/test_image_router.py: POST/api/v1/image/extractreturns 200 with items; LLM parse error returns 502; storage download failure returns 502 - T030 [US2] Implement
app/routers/image.py:POST /image/extracthandler; register inapp/main.py
Checkpoint: POST /api/v1/image/extract fully functional. Run pytest tests/test_image_service.py tests/test_image_router.py -v — all green.
Phase 5: User Stories 3 & 4 — 视频帧提取 + 视频转文本 (Priority: P2)
Goal: POST /api/v1/video/extract-frames and POST /api/v1/video/to-text immediately return 202, process video in background via FastAPI BackgroundTasks, then POST callback to Java backend with results.
Independent Test (US3): Send extract-frames request; verify immediate 202 with job_id; mock storage and callback URL; verify callback received with status=SUCCESS and non-empty frames list.
Independent Test (US4): Send to-text request with start_sec=0, end_sec=10; verify immediate 202; verify callback received with status=SUCCESS and output_path pointing to an uploaded text file.
Tests for User Stories 3 & 4 ⚠️ Write FIRST — verify FAIL before implementing
- T031 [P] [US3] Write
tests/test_video_service.py(frame extraction tests): generate small test video viacv2.VideoWriter; test interval mode extracts correct frame indices; test keyframe mode only extracts frames exceeding difference threshold; test each extracted frame is uploaded toframes/{source_id}/{index}.jpg; test failed extraction triggers FAILED callback with error_message - T032 [P] [US4] Append to
tests/test_video_service.py(to-text tests): test uniform sampling selectsframe_sample_countframes from[start_sec, end_sec]window; test sampled frames are passed as base64 tollm.chat_vision(); test output text is uploaded tovideo-text/{source_id}/{timestamp}.txt; test LLM failure triggers FAILED callback
Implementation for User Stories 3 & 4
- T033 [US3] Create
app/models/video_models.py:ExtractFramesRequest(file_path, source_id, job_id, mode="interval", frame_interval=30),VideoToTextRequest(file_path, source_id, job_id, start_sec, end_sec, model?, prompt_template?),FrameInfo(frame_index, time_sec, frame_path),VideoJobCallback(job_id, status, frames?, output_path?, error_message?),VideoAcceptedResponse(message, job_id) - T034 [US3] Implement frame extraction in
app/services/video_service.py:extract_frames_task(req, llm, storage, callback_url)background function; download video to temp file; open withcv2.VideoCapture; interval mode: step byframe_interval; keyframe mode: compute grayscale frame diff, extract when diff > threshold (default 30.0); upload each frame JPEG; POST callback withFrameInfolist; clean up temp file; catch all exceptions and POST FAILED callback - T035 [US4] Implement to-text in
app/services/video_service.py:video_to_text_task(req, llm, storage, callback_url)background function; download video to temp file; sampleframe_sample_countframes uniformly within[start_sec, end_sec]; base64 encode frames; callllm.chat_vision()with all frames in one multimodal message; upload text result tovideo-text/{source_id}/{timestamp}.txt; POST callback withoutput_path; clean up temp file - T036 [US3] Write
tests/test_video_router.py: POST/api/v1/video/extract-framesreturns 202 immediately; video exceedingmax_file_size_mbreturns 400 withVIDEO_TOO_LARGE; background task is registered (mock BackgroundTasks) - T037 [US4] Append to
tests/test_video_router.py: POST/api/v1/video/to-textreturns 202; size limit applies equally - T038 [US3] Implement
app/routers/video.py:_check_video_size(storage, bucket, file_path, max_mb)helper that callsstorage.get_object_size()and raisesVideoTooLargeError;POST /video/extract-framesandPOST /video/to-texthandlers check size then enqueue background task; register router inapp/main.py
Checkpoint: Both video endpoints fully functional. Run pytest tests/test_video_service.py tests/test_video_router.py -v — all green.
Phase 6: User Stories 5 & 6 — 文本QA生成 + 图像QA生成 (Priority: P2)
Goal: POST /api/v1/qa/gen-text generates QA pairs from text triples; POST /api/v1/qa/gen-image generates multimodal QA pairs from image quads (images fetched and base64-encoded internally).
Independent Test (US5): Send {"items": [{"subject":"变压器","predicate":"额定电压","object":"110kV","source_snippet":"..."}]} to gen-text; verify response contains pairs with non-empty question and answer.
Independent Test (US6): Send {"items": [{"subject":"...","cropped_image_path":"crops/1/0.jpg",...}]} to gen-image; verify response contains pairs with image_path matching crops/1/0.jpg.
Tests for User Stories 5 & 6 ⚠️ Write FIRST — verify FAIL before implementing
- T039 [P] [US5] Write
tests/test_qa_service.py(text QA tests): test triples are formatted into prompt correctly; test LLM response JSON is parsed intoQAPairlist; testLLMParseErroron malformed LLM response; testLLMCallErrorpropagates correctly - T040 [P] [US6] Append to
tests/test_qa_service.py(image QA tests): test storage downloads cropped image and encodes as base64 before LLM call; test multimodal message includes both text (quad info) and inline image data URI; testStorageErroron failed image download
Implementation for User Stories 5 & 6
- T041 [P] [US5] Create
app/models/qa_models.py:TextQAItem(subject, predicate, object, source_snippet),GenTextQARequest(items, model?, prompt_template?),QAPair(question, answer),ImageQAItem(subject, predicate, object, qualifier?, cropped_image_path),GenImageQARequest(items, model?, prompt_template?),ImageQAPair(question, answer, image_path),TextQAResponse(pairs),ImageQAResponse(pairs) - T042 [US5] Implement
gen_text_qa(req, llm) -> TextQAResponseinapp/services/qa_service.py: format all triples + source snippets into a single batch prompt; callllm.chat(); parse JSON array viaextract_json(); returnQAPairlist - T043 [US6] Implement
gen_image_qa(req, llm, storage) -> ImageQAResponseinapp/services/qa_service.py: for eachImageQAItem, downloadcropped_image_pathbytes fromsource-databucket; base64 encode; build multimodal message with quad text +data:image/jpeg;base64,...inline URL; callllm.chat_vision(); parse JSON; returnImageQAPairwithimage_path = item.cropped_image_path - T044 [US5] Write
tests/test_qa_router.py: POST/api/v1/qa/gen-textreturns 200 with pairs; POST/api/v1/qa/gen-imagereturns 200 with pairs including image_path; LLM errors return 502/503 - T045 [US5] Implement
app/routers/qa.py:POST /qa/gen-textandPOST /qa/gen-imagehandlers; register router inapp/main.py
Checkpoint: Both QA endpoints fully functional. Run pytest tests/test_qa_service.py tests/test_qa_router.py -v — all green.
Phase 7: User Stories 7 & 8 — 微调任务管理 + 健康检查 (Priority: P3)
Goal: POST /api/v1/finetune/start submits a ZhipuAI fine-tune job; GET /api/v1/finetune/status/{jobId} queries its state; GET /health returns service liveness.
Independent Test (US7): Call POST /finetune/start with mock LLM returning a job ID; then call GET /finetune/status/{jobId}; verify status is one of RUNNING/SUCCESS/FAILED and progress is an integer.
Independent Test (US8): GET /health returns {"status": "ok"} with HTTP 200 in under 1 second.
Tests for User Stories 7 & 8 ⚠️ Write FIRST — verify FAIL before implementing
- T046 [P] [US7] Write
tests/test_finetune_service.py: testsubmit_finetune()calls ZhipuAI finetune API with correct params and returns job_id; testget_status()maps ZhipuAI"running"→RUNNING,"succeeded"→SUCCESS,"failed"→FAILED, unknown status→RUNNING (conservative); testLLMCallErroron SDK failure - T047 [P] [US8] Write health check test in
tests/test_finetune_router.py(or newtests/test_health.py):GET /healthreturns 200 with{"status": "ok"}
Implementation for User Stories 7 & 8
- T048 [P] [US7] Create
app/models/finetune_models.py:FinetuneStartRequest(jsonl_url, base_model, hyperparams?),FinetuneStartResponse(job_id),FinetuneStatusResponse(job_id, status, progress?, error_message?) - T049 [US7] Implement
app/services/finetune_service.py:submit_finetune(req, llm) -> FinetuneStartResponsecalls ZhipuAI fine-tune create API viarun_in_executor;get_finetune_status(job_id, llm) -> FinetuneStatusResponsecalls ZhipuAI fine-tune retrieve API and maps status strings; raiseLLMCallErroron failure - T050 [US7] Write
tests/test_finetune_router.py:POST /api/v1/finetune/startreturns 200 with job_id;GET /api/v1/finetune/status/{jobId}returns 200 with status fields; unknown job_id propagates error response - T051 [US7] Implement
app/routers/finetune.py:POST /finetune/startandGET /finetune/status/{job_id}handlers; register router inapp/main.py
Checkpoint: All 8 user stories complete. Run pytest tests/ -v — all green.
Phase 8: Polish & Cross-Cutting Concerns
Purpose: Final integration, documentation verification, and deployment readiness.
- T052 [P] Create
.gitignorefor Python project (.env,__pycache__/,*.pyc,.pytest_cache/,tmp/for video temp files) - T053 Run full test suite
conda run -n label pytest tests/ -v --cov=app --cov-report=term-missingand fix any remaining failures or coverage gaps - T054 [P] Verify Swagger/OpenAPI docs at
http://localhost:8000/docsshow all 9 endpoints with correct request/response schemas - T055 Validate quickstart.md end-to-end:
conda activate label && pip install -r requirements.txt && conda run -n label uvicorn app.main:app --reloadstarts cleanly;GET /healthreturns 200;docker-compose up -dbuilds and healthcheck passes
Dependencies & Execution Order
Phase Dependencies
Phase 1 (Setup)
└─→ Phase 2 (Foundational) ← BLOCKS everything
├─→ Phase 3 (US1, P1) ─┐
├─→ Phase 4 (US2, P1) ─┤ Can run in parallel after Phase 2
├─→ Phase 5 (US3+4, P2)─┤
├─→ Phase 6 (US5+6, P2)─┤
└─→ Phase 7 (US7+8, P3)─┘
└─→ Phase 8 (Polish)
User Story Dependencies
| Story | Priority | Depends On | Blocking |
|---|---|---|---|
| US1 (文本三元组) | P1 | Phase 2 only | Nothing |
| US2 (图像四元组) | P1 | Phase 2 only | US6 (shares image downloading pattern) |
| US3 (视频帧提取) | P2 | Phase 2 only | Nothing |
| US4 (视频转文本) | P2 | Phase 2, US3 (shares video_service.py) | Nothing |
| US5 (文本QA) | P2 | Phase 2 only | Nothing |
| US6 (图像QA) | P2 | Phase 2 only | Nothing |
| US7 (微调管理) | P3 | Phase 2 only | Nothing |
| US8 (健康检查) | P3 | T018 (main.py) | Nothing |
Within Each User Story
- Tests MUST be written first and verified to FAIL before implementation
- Models → Services → Routers (in dependency order)
- Register router in
main.pyafter router file is complete - Run story-specific tests before marking story done
Parallel Opportunities
All tasks marked [P] within a phase can run concurrently (different files):
- Phase 2: T008, T009, T010 (core utilities) + T012, T014 (LLM) + T015, T017 (Storage) + T019 (dependencies)
- Phase 3: T021 (tests) and T022 (models) can start together
- Phase 4: T026 (tests) and T027 (models) can start together
- Phase 5: T031 (US3 tests) and T032 (US4 tests) can start together
- Phase 6: T039 (US5 tests) and T040, T041 (US6 tests + models) can start together
- Phase 7: T046, T047, T048 can start together
Parallel Example: Phase 2 Foundational
# Kick off these in parallel (all different files):
[T008] app/core/logging.py
[T009] app/core/exceptions.py
[T010] app/core/json_utils.py
[T012] app/clients/llm/base.py
[T014] tests/test_llm_client.py
[T015] app/clients/storage/base.py
[T017] tests/test_storage_client.py
[T019] app/core/dependencies.py
# Then in sequence (each depends on previous):
[T007] app/core/config.py → [T011] tests/test_config.py
[T013] app/clients/llm/zhipuai_client.py (needs T012)
[T016] app/clients/storage/rustfs_client.py (needs T015)
[T018] app/main.py (needs T009, T008)
[T020] tests/conftest.py (needs T018, T013, T016)
Implementation Strategy
MVP First (US1 + US2 — P1 Stories Only)
- Complete Phase 1: Setup
- Complete Phase 2: Foundational (CRITICAL — blocks all stories)
- Complete Phase 3: US1 (文本三元组提取) → validate independently
- Complete Phase 4: US2 (图像四元组提取) → validate independently
- STOP and DEMO: Core extraction pipeline is production-ready
Incremental Delivery
Phase 1+2 complete → Foundation ready (commit)
Phase 3 complete → Text extraction works (commit, demo)
Phase 4 complete → Image extraction works (commit, demo)
Phase 5 complete → Video processing works (commit, demo)
Phase 6 complete → QA generation works (commit, demo)
Phase 7 complete → Fine-tune management (commit, demo)
Phase 8 complete → Production-ready (tag release)
Parallel Team Strategy
With two developers after Phase 2 completes:
- Dev A: US1 (text) → US5 (text QA) → US7 (finetune)
- Dev B: US2 (image) → US6 (image QA) → US3+US4 (video)
Summary
| Phase | Tasks | User Story | Priority |
|---|---|---|---|
| Phase 1: Setup | T001–T006 (6) | — | — |
| Phase 2: Foundational | T007–T020 (14) | — | — |
| Phase 3 | T021–T025 (5) | US1 文本三元组 | P1 🎯 MVP |
| Phase 4 | T026–T030 (5) | US2 图像四元组 | P1 |
| Phase 5 | T031–T038 (8) | US3+US4 视频处理 | P2 |
| Phase 6 | T039–T045 (7) | US5+US6 QA生成 | P2 |
| Phase 7 | T046–T051 (6) | US7+US8 微调+健康检查 | P3 |
| Phase 8: Polish | T052–T055 (4) | — | — |
| Total | 55 tasks | 8 user stories |
Notes
[P]tasks = different files, no shared dependencies within the same phase[US?]label maps each task to its user story for traceability- Tests in
tests/conftest.py(T020) useAsyncMock— no real ZhipuAI or RustFS calls in unit tests - Video tasks use a real small video file generated by
cv2.VideoWriterin tests — no external media needed - All config is loaded via
get_config()— never hardcode model names or bucket names in services - Commit after each phase checkpoint at minimum; commit after each task for clean git history
- Stop at any checkpoint to validate the story independently before proceeding