Covers architecture, adapter layer (LLMClient/StorageClient ABC), all 8 API endpoints with request/response schemas, service layer logic, config strategy (config.yaml + .env), logging, exception handling, RustFS path conventions, and Docker deployment.
24 KiB
知识图谱智能标注平台 — AI 服务设计文档
版本:v1.0 | 日期:2026-04-10
运行时:Python 3.12.13(condalabel环境)| 框架:FastAPI
上游系统:label-backend(Java Spring Boot)| 模型:ZhipuAI GLM 系列
一、项目定位
AI 服务(label_ai_service)是标注平台的智能计算层,独立部署为 Python FastAPI 服务,接收 Java 后端调用,完成以下核心任务:
| 能力 | 说明 |
|---|---|
| 文本三元组提取 | 从 TXT / PDF / DOCX 文档中提取 subject / predicate / object + 原文定位信息 |
| 图像四元组提取 | 调用 GLM-4V 分析图片,提取四元组 + bbox 坐标,自动裁剪区域图 |
| 视频帧提取 | OpenCV 按间隔或关键帧模式抽帧,帧图上传 RustFS |
| 视频转文本 | GLM-4V 理解视频片段,输出结构化文字描述,降维为文本标注流程 |
| 问答对生成 | 基于三元组/四元组 + 原文/图像证据,生成 GLM 微调格式候选问答对 |
| 微调任务管理 | 向 ZhipuAI 提交微调任务、查询状态 |
系统只有两条标注流水线(文本线、图片线),视频是两种预处理入口,不构成第三条流水线。
二、整体架构
2.1 在平台中的位置
┌─────────────┐
│ Nginx 反代 │
└──────┬──────┘
┌─────────────┼─────────────┐
▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────┐
│ Vue3 前端│ │ Spring │ │ FastAPI │
│ (静态) │ │ Boot 后端 │ │ AI 服务 │◄── 本文档范围
└─────────┘ └────┬─────┘ └────┬─────┘
│ │
┌───────────┼──────────────┤
▼ ▼ ▼
┌──────────┐ ┌────────┐ ┌────────────┐
│PostgreSQL│ │ Redis │ │ RustFS │
└──────────┘ └────────┘ └────────────┘
AI 服务不直接访问数据库,只通过:
- RustFS S3 API:读取原始文件、写入处理结果
- ZhipuAI API:调用 GLM 系列模型
- Java 后端回调接口:视频异步任务完成后回传结果
2.2 目录结构
label_ai_service/
├── app/
│ ├── main.py # FastAPI 应用入口,注册路由、lifespan
│ ├── core/
│ │ ├── config.py # YAML + .env 分层配置,lru_cache 单例
│ │ ├── logging.py # 统一结构化日志配置
│ │ ├── exceptions.py # 自定义异常类 + 全局异常处理器
│ │ └── dependencies.py # FastAPI Depends 工厂函数
│ ├── clients/
│ │ ├── llm/
│ │ │ ├── base.py # LLMClient ABC(抽象接口)
│ │ │ └── zhipuai_client.py # ZhipuAI 实现
│ │ └── storage/
│ │ ├── base.py # StorageClient ABC(抽象接口)
│ │ └── rustfs_client.py # RustFS S3 兼容实现(boto3)
│ ├── services/
│ │ ├── text_service.py # 文档解析 + 三元组提取
│ │ ├── image_service.py # 图像四元组提取 + bbox 裁剪
│ │ ├── video_service.py # OpenCV 抽帧 + 视频转文本
│ │ ├── qa_service.py # 文本/图像问答对生成
│ │ └── finetune_service.py # 微调任务提交与状态查询
│ ├── routers/
│ │ ├── text.py # POST /api/v1/text/extract
│ │ ├── image.py # POST /api/v1/image/extract
│ │ ├── video.py # POST /api/v1/video/extract-frames
│ │ │ # POST /api/v1/video/to-text
│ │ ├── qa.py # POST /api/v1/qa/gen-text
│ │ │ # POST /api/v1/qa/gen-image
│ │ └── finetune.py # POST /api/v1/finetune/start
│ │ # GET /api/v1/finetune/status/{jobId}
│ └── models/
│ ├── text_models.py # 三元组请求/响应 schema
│ ├── image_models.py # 四元组请求/响应 schema
│ ├── video_models.py # 视频处理请求/响应 schema
│ ├── qa_models.py # 问答对请求/响应 schema
│ └── finetune_models.py # 微调请求/响应 schema
├── config.yaml # 非敏感配置(提交 git)
├── .env # 密钥与环境差异项(提交 git)
├── requirements.txt
├── Dockerfile
└── docker-compose.yml
三、配置设计
3.1 分层配置原则
| 文件 | 职责 | 提交 git |
|---|---|---|
config.yaml |
稳定配置:端口、路径规范、模型名、桶名、视频参数 | ✅ |
.env |
环境差异项:密钥、服务地址 | ✅ |
环境变量优先级高于 config.yaml,Docker Compose 通过 env_file 加载 .env,本地开发由 python-dotenv 加载。
3.2 config.yaml
server:
port: 8000
log_level: INFO
storage:
buckets:
source_data: "source-data"
finetune_export: "finetune-export"
backend: {} # callback_url 由 .env 注入
video:
frame_sample_count: 8 # 视频转文本时均匀抽取的代表帧数
models:
default_text: "glm-4-flash"
default_vision: "glm-4v-flash"
3.3 .env
ZHIPUAI_API_KEY=your-zhipuai-api-key
STORAGE_ACCESS_KEY=minioadmin
STORAGE_SECRET_KEY=minioadmin
STORAGE_ENDPOINT=http://rustfs:9000
BACKEND_CALLBACK_URL=http://backend:8080/internal/video-job/callback
3.4 config 模块实现
# core/config.py
import os, yaml
from functools import lru_cache
from pathlib import Path
from dotenv import load_dotenv
_ROOT = Path(__file__).parent.parent.parent
# 环境变量 → YAML 路径映射
_ENV_OVERRIDES = {
"ZHIPUAI_API_KEY": ["zhipuai", "api_key"],
"STORAGE_ACCESS_KEY": ["storage", "access_key"],
"STORAGE_SECRET_KEY": ["storage", "secret_key"],
"STORAGE_ENDPOINT": ["storage", "endpoint"],
"BACKEND_CALLBACK_URL": ["backend", "callback_url"],
"LOG_LEVEL": ["server", "log_level"],
}
def _set_nested(d: dict, keys: list[str], value: str):
for k in keys[:-1]:
d = d.setdefault(k, {})
d[keys[-1]] = value
@lru_cache(maxsize=1)
def get_config() -> dict:
load_dotenv(_ROOT / ".env") # 1. 加载 .env
with open(_ROOT / "config.yaml", encoding="utf-8") as f:
cfg = yaml.safe_load(f) # 2. 读取 YAML
for env_key, yaml_path in _ENV_OVERRIDES.items(): # 3. 环境变量覆盖
val = os.environ.get(env_key)
if val:
_set_nested(cfg, yaml_path, val)
_validate(cfg)
return cfg
def _validate(cfg: dict):
checks = [
(["zhipuai", "api_key"], "ZHIPUAI_API_KEY"),
(["storage", "access_key"], "STORAGE_ACCESS_KEY"),
(["storage", "secret_key"], "STORAGE_SECRET_KEY"),
]
for path, name in checks:
val = cfg
for k in path:
val = (val or {}).get(k, "")
if not val:
raise RuntimeError(f"缺少必要配置项:{name}")
四、适配层设计
4.1 LLM 适配层
# clients/llm/base.py
from abc import ABC, abstractmethod
class LLMClient(ABC):
@abstractmethod
async def chat(self, messages: list[dict], model: str, **kwargs) -> str:
"""纯文本对话,返回模型输出文本"""
@abstractmethod
async def chat_vision(self, messages: list[dict], model: str, **kwargs) -> str:
"""多模态对话(图文混合输入),返回模型输出文本"""
# clients/llm/zhipuai_client.py
import asyncio
from zhipuai import ZhipuAI
from .base import LLMClient
class ZhipuAIClient(LLMClient):
def __init__(self, api_key: str):
self._client = ZhipuAI(api_key=api_key)
async def chat(self, messages: list[dict], model: str, **kwargs) -> str:
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(
None,
lambda: self._client.chat.completions.create(
model=model, messages=messages, **kwargs
),
)
return resp.choices[0].message.content
async def chat_vision(self, messages: list[dict], model: str, **kwargs) -> str:
# GLM-4V 与文本接口相同,通过 image_url type 区分图文消息
return await self.chat(messages, model, **kwargs)
扩展:替换 GLM 只需新增 class OpenAIClient(LLMClient) 并在 lifespan 中注入,services 层零修改。
4.2 Storage 适配层
# clients/storage/base.py
from abc import ABC, abstractmethod
class StorageClient(ABC):
@abstractmethod
async def download_bytes(self, bucket: str, path: str) -> bytes: ...
@abstractmethod
async def upload_bytes(
self, bucket: str, path: str, data: bytes,
content_type: str = "application/octet-stream"
) -> None: ...
@abstractmethod
def get_presigned_url(self, bucket: str, path: str, expires: int = 3600) -> str: ...
# clients/storage/rustfs_client.py
import asyncio
import boto3
from .base import StorageClient
class RustFSClient(StorageClient):
def __init__(self, endpoint: str, access_key: str, secret_key: str):
self._s3 = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
async def download_bytes(self, bucket: str, path: str) -> bytes:
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(
None, lambda: self._s3.get_object(Bucket=bucket, Key=path)
)
return resp["Body"].read()
async def upload_bytes(self, bucket, path, data, content_type="application/octet-stream"):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
None,
lambda: self._s3.put_object(
Bucket=bucket, Key=path, Body=data, ContentType=content_type
),
)
def get_presigned_url(self, bucket: str, path: str, expires: int = 3600) -> str:
return self._s3.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": path},
ExpiresIn=expires,
)
4.3 依赖注入
# core/dependencies.py
from app.clients.llm.base import LLMClient
from app.clients.storage.base import StorageClient
_llm_client: LLMClient | None = None
_storage_client: StorageClient | None = None
def set_clients(llm: LLMClient, storage: StorageClient):
global _llm_client, _storage_client
_llm_client, _storage_client = llm, storage
def get_llm_client() -> LLMClient:
return _llm_client
def get_storage_client() -> StorageClient:
return _storage_client
# main.py(lifespan 初始化)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.core.config import get_config
from app.core.dependencies import set_clients
from app.clients.llm.zhipuai_client import ZhipuAIClient
from app.clients.storage.rustfs_client import RustFSClient
@asynccontextmanager
async def lifespan(app: FastAPI):
cfg = get_config()
set_clients(
llm=ZhipuAIClient(api_key=cfg["zhipuai"]["api_key"]),
storage=RustFSClient(
endpoint=cfg["storage"]["endpoint"],
access_key=cfg["storage"]["access_key"],
secret_key=cfg["storage"]["secret_key"],
),
)
yield
app = FastAPI(title="Label AI Service", lifespan=lifespan)
五、API 接口设计
统一前缀:/api/v1。FastAPI 自动生成 Swagger 文档(/docs)。
5.1 文本三元组提取
POST /api/v1/text/extract
// 请求
{
"file_path": "text/202404/123.txt",
"file_name": "设备规范.txt",
"model": "glm-4-flash",
"prompt_template": "..." // 可选,不传使用 config 默认
}
// 响应
{
"items": [
{
"subject": "变压器",
"predicate": "额定电压",
"object": "110kV",
"source_snippet": "该变压器额定电压为110kV,...",
"source_offset": {"start": 120, "end": 280}
}
]
}
5.2 图像四元组提取
POST /api/v1/image/extract
// 请求
{
"file_path": "image/202404/456.jpg",
"task_id": 789,
"model": "glm-4v-flash",
"prompt_template": "..."
}
// 响应
{
"items": [
{
"subject": "电缆接头",
"predicate": "位于",
"object": "配电箱左侧",
"qualifier": "2024年检修现场",
"bbox": {"x": 10, "y": 20, "w": 100, "h": 80},
"cropped_image_path": "crops/789/0.jpg"
}
]
}
裁剪图由 AI 服务自动完成并上传 RustFS,cropped_image_path 直接写入响应。
5.3 视频帧提取(异步)
POST /api/v1/video/extract-frames
// 请求
{
"file_path": "video/202404/001.mp4",
"source_id": 10,
"job_id": 42,
"mode": "interval", // interval | keyframe
"frame_interval": 30 // interval 模式专用,单位:帧数
}
// 立即响应(202 Accepted)
{
"message": "任务已接受,后台处理中",
"job_id": 42
}
后台完成后,AI 服务调用 Java 后端回调接口:
POST {BACKEND_CALLBACK_URL}
{
"job_id": 42,
"status": "SUCCESS",
"frames": [
{"frame_index": 0, "time_sec": 0.0, "frame_path": "frames/10/0.jpg"},
{"frame_index": 30, "time_sec": 1.0, "frame_path": "frames/10/1.jpg"}
],
"error_message": null
}
5.4 视频转文本(异步)
POST /api/v1/video/to-text
// 请求
{
"file_path": "video/202404/001.mp4",
"source_id": 10,
"job_id": 43,
"start_sec": 0,
"end_sec": 120,
"model": "glm-4v-flash",
"prompt_template": "..."
}
// 立即响应(202 Accepted)
{
"message": "任务已接受,后台处理中",
"job_id": 43
}
后台完成后回调:
POST {BACKEND_CALLBACK_URL}
{
"job_id": 43,
"status": "SUCCESS",
"output_path": "video-text/10/1712800000.txt",
"error_message": null
}
5.5 文本问答对生成
POST /api/v1/qa/gen-text
// 请求
{
"items": [
{
"subject": "变压器",
"predicate": "额定电压",
"object": "110kV",
"source_snippet": "该变压器额定电压为110kV,..."
}
],
"model": "glm-4-flash",
"prompt_template": "..."
}
// 响应
{
"pairs": [
{
"question": "变压器的额定电压是多少?",
"answer": "该变压器额定电压为110kV。"
}
]
}
5.6 图像问答对生成
POST /api/v1/qa/gen-image
// 请求
{
"items": [
{
"subject": "电缆接头",
"predicate": "位于",
"object": "配电箱左侧",
"qualifier": "2024年检修现场",
"cropped_image_path": "crops/789/0.jpg"
}
],
"model": "glm-4v-flash",
"prompt_template": "..."
}
// 响应
{
"pairs": [
{
"question": "图中电缆接头位于何处?",
"answer": "图中电缆接头位于配电箱左侧。",
"image_path": "crops/789/0.jpg"
}
]
}
图像 QA 生成时,AI 服务通过 get_presigned_url 获取裁剪图临时访问 URL,构造多模态消息后调用 GLM-4V。
5.7 提交微调任务
POST /api/v1/finetune/start
// 请求
{
"jsonl_url": "https://rustfs.example.com/finetune-export/export/xxx.jsonl",
"base_model": "glm-4-flash",
"hyperparams": {
"learning_rate": 1e-4,
"epochs": 3
}
}
// 响应
{
"job_id": "glm-ft-xxxxxx"
}
5.8 查询微调状态
GET /api/v1/finetune/status/{jobId}
// 响应
{
"job_id": "glm-ft-xxxxxx",
"status": "RUNNING", // RUNNING | SUCCESS | FAILED
"progress": 45,
"error_message": null
}
六、Service 层设计
6.1 text_service — 文档解析 + 三元组提取
1. storage.download_bytes("source-data", file_path) → bytes
2. 按扩展名路由解析器:
.txt → decode("utf-8")
.pdf → pdfplumber.open() 提取全文
.docx → python-docx 遍历段落
3. 拼装 Prompt(系统模板 + 文档正文)
4. llm.chat(messages, model) → JSON 字符串
5. 解析 JSON → 校验字段完整性 → 返回 TripleList
解析器注册表(消除 if-else):
PARSERS: dict[str, Callable[[bytes], str]] = {
".txt": parse_txt,
".pdf": parse_pdf,
".docx": parse_docx,
}
def extract_text(data: bytes, filename: str) -> str:
ext = Path(filename).suffix.lower()
if ext not in PARSERS:
raise UnsupportedFileTypeError(ext)
return PARSERS[ext](data)
6.2 image_service — 四元组提取 + bbox 裁剪
1. storage.download_bytes("source-data", file_path) → bytes
2. 图片 bytes 转 base64,构造 GLM-4V image_url 消息
3. llm.chat_vision(messages, model) → JSON 字符串
4. 解析四元组(含 bbox)
5. 按 bbox 裁剪:
numpy 解码 bytes → cv2 裁剪区域 → cv2.imencode(".jpg") → bytes
6. storage.upload_bytes("source-data", f"crops/{task_id}/{i}.jpg", ...)
7. 返回 QuadrupleList(含 cropped_image_path)
6.3 video_service — OpenCV 抽帧 + 视频转文本
抽帧(BackgroundTask):
1. storage.download_bytes → bytes → 写入 tempfile
2. cv2.VideoCapture 打开临时文件
3. interval 模式:按 frame_interval 步进读帧
keyframe 模式:逐帧计算与前帧的像素差均值,差值超过阈值则判定为场景切换关键帧
(OpenCV 无原生 I 帧检测,用帧差分近似实现)
4. 每帧 cv2.imencode(".jpg") → upload_bytes("source-data", f"frames/{source_id}/{i}.jpg")
5. 清理临时文件
6. httpx.post(BACKEND_CALLBACK_URL, json={job_id, status="SUCCESS", frames=[...]})
异常:回调 status="FAILED", error_message=str(e)
视频转文本(BackgroundTask):
1. download_bytes → tempfile
2. cv2.VideoCapture 在 start_sec~end_sec 区间均匀抽 frame_sample_count 帧
3. 每帧转 base64,构造多图 GLM-4V 消息(含时序说明)
4. llm.chat_vision → 文字描述
5. 描述文本 upload_bytes("source-data", f"video-text/{source_id}/{timestamp}.txt")
6. 回调 Java 后端:output_path + status="SUCCESS"
6.4 qa_service — 问答对生成
文本 QA:
批量拼入三元组 + source_snippet 到 Prompt
llm.chat(messages, model) → 解析问答对 JSON → QAPairList
图像 QA:
遍历四元组列表
storage.get_presigned_url(cropped_image_path) → 临时 URL
构造多模态消息(image_url + 问题指令)
llm.chat_vision → 解析 → 含 image_path 的 QAPairList
6.5 finetune_service — GLM 微调对接
微调 API 属 ZhipuAI 专有能力,无需抽象为通用接口。finetune_service 直接依赖 ZhipuAIClient(通过依赖注入获取后强转类型),不走 LLMClient ABC。
提交:
zhipuai_client._client.fine_tuning.jobs.create(
training_file=jsonl_url,
model=base_model,
hyperparameters=hyperparams
) → job_id
查询:
zhipuai_client._client.fine_tuning.jobs.retrieve(job_id)
→ 映射 status 枚举 RUNNING / SUCCESS / FAILED
七、日志设计
- 使用标准库
logging,JSON 格式输出,与 uvicorn 集成 - 每个请求记录:
method / path / status_code / duration_ms - 每次 GLM 调用记录:
model / prompt_tokens / completion_tokens / duration_ms - BackgroundTask 记录:
job_id / stage / status / error - 不记录文件内容原文(防止敏感数据泄露)
八、异常处理
| 异常类 | HTTP 状态码 | 场景 |
|---|---|---|
UnsupportedFileTypeError |
400 | 文件格式不支持 |
StorageDownloadError |
502 | RustFS 不可达或文件不存在 |
LLMResponseParseError |
502 | GLM 返回非合法 JSON |
LLMCallError |
503 | GLM API 限流 / 超时 |
| 未捕获异常 | 500 | 记录完整 traceback |
所有错误响应统一格式:
{"code": "ERROR_CODE", "message": "具体描述"}
九、RustFS 存储路径规范
| 资源类型 | 存储桶 | 路径格式 |
|---|---|---|
| 上传文本文件 | source-data |
text/{年月}/{source_id}.txt |
| 上传图片 | source-data |
image/{年月}/{source_id}.jpg |
| 上传视频 | source-data |
video/{年月}/{source_id}.mp4 |
| 视频帧模式抽取的帧图 | source-data |
frames/{source_id}/{frame_index}.jpg |
| 视频片段转译输出的文本 | source-data |
video-text/{source_id}/{timestamp}.txt |
| 图像/帧 bbox 裁剪图 | source-data |
crops/{task_id}/{item_index}.jpg |
| 导出 JSONL 文件 | finetune-export |
export/{batchUuid}.jsonl |
十、部署设计
10.1 Dockerfile
FROM python:3.12-slim
WORKDIR /app
# OpenCV 系统依赖
RUN apt-get update && apt-get install -y \
libgl1 libglib2.0-0 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ ./app/
COPY config.yaml .
COPY .env .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
10.2 docker-compose.yml(ai-service 片段)
ai-service:
build: ./label_ai_service
ports:
- "8000:8000"
env_file:
- ./label_ai_service/.env
depends_on:
- rustfs
- backend
networks:
- label-net
10.3 requirements.txt
fastapi>=0.111
uvicorn[standard]>=0.29
pydantic>=2.7
python-dotenv>=1.0
pyyaml>=6.0
zhipuai>=2.1
boto3>=1.34
pdfplumber>=0.11
python-docx>=1.1
opencv-python-headless>=4.9
numpy>=1.26
httpx>=0.27
十一、关键设计决策
11.1 为何 LLMClient / StorageClient 使用 ABC
当前只实现 ZhipuAI 和 RustFS,但模型选型和对象存储可能随项目演进变化。ABC 约束接口契约,保证替换实现时 services 层零修改。注入点集中在 lifespan,一处修改全局生效。
11.2 为何 ZhipuAI 同步 SDK 在线程池中调用
ZhipuAI 官方 SDK 是同步阻塞调用,直接 await 不生效。通过 loop.run_in_executor(None, ...) 在线程池中运行,不阻塞 FastAPI 的 asyncio 事件循环,保持并发处理能力。
11.3 为何视频任务使用 BackgroundTasks 而非 Celery
项目规模适中,视频处理任务由 ADMIN 手动触发,并发量可控。FastAPI BackgroundTasks 无需额外中间件(Redis 队列、Celery Worker),部署简单,任务状态通过回调接口传递给 Java 后端管理,符合整体架构风格。
11.4 为何图像 QA 生成用 presigned URL 而非 base64
裁剪图已存储在 RustFS,GLM-4V 支持通过 URL 直接访问图片。presigned URL 避免将图片内容重新加载到 AI 服务内存后再 base64 编码,减少内存压力,适合多张图片批量生成的场景。
11.5 config.yaml + .env 分层配置的原因
config.yaml 存结构化、稳定的非敏感配置,可读性好,适合 git 追踪变更历史;.env 存密钥和环境差异项,格式简单,Docker env_file 原生支持,本地开发和容器启动行为一致,无需维护两套配置文件。
文档版本:v1.0 | 生成日期:2026-04-10