diff --git a/.gitea/workflows/issue-branch-preview.yml b/.gitea/workflows/issue-branch-preview.yml
new file mode 100644
index 0000000..4a1e78a
--- /dev/null
+++ b/.gitea/workflows/issue-branch-preview.yml
@@ -0,0 +1,164 @@
+name: issue-branch-preview
+
+on:
+ push:
+ branches-ignore:
+ - main
+ workflow_dispatch:
+ inputs:
+ branch:
+ description: "Target branch (optional, default current ref)"
+ required: false
+ type: string
+ issue:
+ description: "Issue number (optional, auto-parse from branch)"
+ required: false
+ type: string
+
+jobs:
+ allocate-and-deploy:
+ runs-on: ubuntu-latest
+ env:
+ PREVIEW_SLOTS: ${{ vars.PREVIEW_SLOTS }}
+ PREVIEW_URL_TEMPLATE: ${{ vars.PREVIEW_URL_TEMPLATE }}
+ PREVIEW_TTL_HOURS: ${{ vars.PREVIEW_TTL_HOURS }}
+ PREVIEW_STATE_FILE: .tmp/preview-slots.json
+ CLIENT_DEPLOY_CMD: ${{ vars.CLIENT_DEPLOY_CMD }}
+ SERVER_DEPLOY_CMD: ${{ vars.SERVER_DEPLOY_CMD }}
+ FULL_STACK_DEPLOY_CMD: ${{ vars.FULL_STACK_DEPLOY_CMD }}
+ INFRA_APPLY_CMD: ${{ vars.INFRA_APPLY_CMD }}
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
+
+ - name: Setup Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.11"
+
+ - name: Resolve branch and issue
+ id: target
+ shell: bash
+ run: |
+ BRANCH="${{ inputs.branch }}"
+ ISSUE_INPUT="${{ inputs.issue }}"
+ if [ -z "$BRANCH" ]; then
+ BRANCH="${GITHUB_REF_NAME:-$(git rev-parse --abbrev-ref HEAD)}"
+ fi
+
+ ISSUE_ID="$ISSUE_INPUT"
+ if [ -z "$ISSUE_ID" ]; then
+ ISSUE_ID="$(echo "$BRANCH" | sed -nE 's#^issue[-/ ]?([0-9]+).*$#\1#p')"
+ fi
+ if [ -z "$ISSUE_ID" ]; then
+ ISSUE_ID="$(echo "$BRANCH" | sed -nE 's#^.*/([0-9]+).*$#\1#p')"
+ fi
+ if [ -z "$ISSUE_ID" ]; then
+ ISSUE_ID="0"
+ fi
+
+ echo "branch=$BRANCH" >> "$GITHUB_OUTPUT"
+ echo "issue=$ISSUE_ID" >> "$GITHUB_OUTPUT"
+
+ - name: Detect change scope
+ id: scope
+ shell: bash
+ run: |
+ git fetch origin main --depth=1 || true
+ mkdir -p .tmp
+ python skills/gitea-issue-devops-agent/scripts/change_scope.py \
+ --repo-path . \
+ --base-ref origin/main \
+ --head-ref "${{ steps.target.outputs.branch }}" > .tmp/change-scope.json
+ SCOPE="$(python -c "import json;print(json.load(open('.tmp/change-scope.json', encoding='utf-8'))['scope'])")"
+ echo "scope=$SCOPE" >> "$GITHUB_OUTPUT"
+ cat .tmp/change-scope.json
+
+ - name: Allocate preview slot
+ id: slot
+ shell: bash
+ run: |
+ SLOTS="${PREVIEW_SLOTS:-preview-a,preview-b}"
+ TTL="${PREVIEW_TTL_HOURS:-24}"
+ URL_TEMPLATE="${PREVIEW_URL_TEMPLATE:-https://{slot}.qa.example.com}"
+ mkdir -p .tmp
+
+ python skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py \
+ --state-file "$PREVIEW_STATE_FILE" \
+ --slots "$SLOTS" \
+ --repo "${GITHUB_REPOSITORY}" \
+ --issue "${{ steps.target.outputs.issue }}" \
+ --branch "${{ steps.target.outputs.branch }}" \
+ --ttl-hours "$TTL" \
+ --url-template "$URL_TEMPLATE" \
+ --evict-oldest > .tmp/slot-allocation.json
+
+ SLOT="$(python -c "import json;d=json.load(open('.tmp/slot-allocation.json', encoding='utf-8'));print(d.get('allocation',{}).get('slot',''))")"
+ URL="$(python -c "import json;d=json.load(open('.tmp/slot-allocation.json', encoding='utf-8'));print(d.get('allocation',{}).get('url',''))")"
+ echo "slot=$SLOT" >> "$GITHUB_OUTPUT"
+ echo "url=$URL" >> "$GITHUB_OUTPUT"
+ cat .tmp/slot-allocation.json
+
+ - name: Deploy by scope
+ shell: bash
+ run: |
+ set -euo pipefail
+ SCOPE="${{ steps.scope.outputs.scope }}"
+ run_or_echo () {
+ local cmd="$1"
+ local fallback="$2"
+ if [ -n "$cmd" ]; then
+ bash -lc "$cmd"
+ else
+ echo "$fallback"
+ fi
+ }
+
+ case "$SCOPE" in
+ skip)
+ echo "Scope=skip: docs/tests-only or no changes, deployment skipped."
+ ;;
+ client_only)
+ run_or_echo "${CLIENT_DEPLOY_CMD:-}" "Scope=client_only: set repo var CLIENT_DEPLOY_CMD."
+ ;;
+ server_only)
+ run_or_echo "${SERVER_DEPLOY_CMD:-}" "Scope=server_only: set repo var SERVER_DEPLOY_CMD."
+ ;;
+ full_stack)
+ run_or_echo "${FULL_STACK_DEPLOY_CMD:-}" "Scope=full_stack: set repo var FULL_STACK_DEPLOY_CMD."
+ ;;
+ infra_only)
+ run_or_echo "${INFRA_APPLY_CMD:-}" "Scope=infra_only: set repo var INFRA_APPLY_CMD."
+ ;;
+ *)
+ echo "Unknown scope: $SCOPE"
+ exit 1
+ ;;
+ esac
+
+ - name: Persist preview slot state
+ shell: bash
+ run: |
+ if [ ! -f "$PREVIEW_STATE_FILE" ]; then
+ exit 0
+ fi
+ if [ -z "$(git status --porcelain -- "$PREVIEW_STATE_FILE")" ]; then
+ exit 0
+ fi
+
+ git config user.name "gitea-actions"
+ git config user.email "gitea-actions@local"
+ git add "$PREVIEW_STATE_FILE"
+ git commit -m "chore: update preview slot state [skip ci]" || true
+ git push || true
+
+ - name: Summary
+ shell: bash
+ run: |
+ echo "branch: ${{ steps.target.outputs.branch }}"
+ echo "issue: ${{ steps.target.outputs.issue }}"
+ echo "scope: ${{ steps.scope.outputs.scope }}"
+ echo "slot: ${{ steps.slot.outputs.slot }}"
+ echo "url: ${{ steps.slot.outputs.url }}"
diff --git a/.gitea/workflows/preview-slot-reclaim.yml b/.gitea/workflows/preview-slot-reclaim.yml
new file mode 100644
index 0000000..5db4d4a
--- /dev/null
+++ b/.gitea/workflows/preview-slot-reclaim.yml
@@ -0,0 +1,83 @@
+name: preview-slot-reclaim
+
+on:
+ schedule:
+ - cron: "15 * * * *"
+ workflow_dispatch:
+ inputs:
+ repo:
+ description: "owner/repo (optional)"
+ required: false
+ type: string
+ branch:
+ description: "Branch to release (optional)"
+ required: false
+ type: string
+ issue:
+ description: "Issue number to release (optional)"
+ required: false
+ type: string
+
+jobs:
+ reclaim:
+ runs-on: ubuntu-latest
+ env:
+ PREVIEW_SLOTS: ${{ vars.PREVIEW_SLOTS }}
+ PREVIEW_STATE_FILE: .tmp/preview-slots.json
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
+
+ - name: Setup Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.11"
+
+ - name: Release target or prune expired
+ shell: bash
+ run: |
+ SLOTS="${PREVIEW_SLOTS:-preview-a,preview-b}"
+ REPO_INPUT="${{ inputs.repo }}"
+ BRANCH_INPUT="${{ inputs.branch }}"
+ ISSUE_INPUT="${{ inputs.issue }}"
+
+ mkdir -p .tmp
+
+ if [ -n "$REPO_INPUT" ] || [ -n "$BRANCH_INPUT" ] || [ -n "$ISSUE_INPUT" ]; then
+ ISSUE_FLAG=""
+ if [ -n "$ISSUE_INPUT" ]; then
+ ISSUE_FLAG="--issue $ISSUE_INPUT"
+ fi
+ python skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py \
+ --state-file "$PREVIEW_STATE_FILE" \
+ --slots "$SLOTS" \
+ --repo "$REPO_INPUT" \
+ --branch "$BRANCH_INPUT" \
+ $ISSUE_FLAG \
+ --release > .tmp/slot-reclaim.json
+ else
+ python skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py \
+ --state-file "$PREVIEW_STATE_FILE" \
+ --slots "$SLOTS" \
+ --list > .tmp/slot-reclaim.json
+ fi
+
+ cat .tmp/slot-reclaim.json
+
+ - name: Persist state
+ shell: bash
+ run: |
+ if [ ! -f "$PREVIEW_STATE_FILE" ]; then
+ exit 0
+ fi
+ if [ -z "$(git status --porcelain -- "$PREVIEW_STATE_FILE")" ]; then
+ exit 0
+ fi
+
+ git config user.name "gitea-actions"
+ git config user.email "gitea-actions@local"
+ git add "$PREVIEW_STATE_FILE"
+ git commit -m "chore: reclaim preview slots [skip ci]" || true
+ git push || true
diff --git a/README.md b/README.md
index 1b406f0..42ad0cd 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,55 @@
-# devops-skills
+# DevOps Skills
-Devops驱动开发场景投喂工具
\ No newline at end of file
+面向 **Gitea Issue 驱动交付** 的技能仓库,内置 `gitea-issue-devops-agent`,支持:
+
+- 根据 issue 指定分支执行修复与提测
+- 分支级预览环境槽位分配与回收
+- 按变更范围智能部署(避免无意义重启服务端)
+- 自动 / 半自动 / 全人工 三种协作模式
+- 图片类 issue 证据抓取与审阅
+
+## 文档网页
+
+- 站点文件:`site/index.html`
+- 仓库内查看:`https://fun-md.com/Fun_MD/devops-skills/src/branch/main/site/index.html`
+- 原始页面:`https://fun-md.com/Fun_MD/devops-skills/raw/branch/main/site/index.html`
+
+## 技能路径
+
+- `skills/gitea-issue-devops-agent/SKILL.md`
+
+## 安装
+
+```bash
+git clone https://fun-md.com/Fun_MD/devops-skills.git
+cd devops-skills
+mkdir -p ~/.codex/skills
+cp -r skills/gitea-issue-devops-agent ~/.codex/skills/
+```
+
+Windows PowerShell:
+
+```powershell
+git clone https://fun-md.com/Fun_MD/devops-skills.git
+cd devops-skills
+New-Item -ItemType Directory -Force $HOME\.codex\skills | Out-Null
+Copy-Item .\skills\gitea-issue-devops-agent $HOME\.codex\skills\gitea-issue-devops-agent -Recurse -Force
+```
+
+## 核心脚本
+
+- `skills/gitea-issue-devops-agent/scripts/issue_audit.py`
+ - issue 拉取、质量评分、去重、附件/图片抓取
+- `skills/gitea-issue-devops-agent/scripts/change_scope.py`
+ - 识别 `skip/client_only/server_only/full_stack/infra_only`
+- `skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py`
+ - 分支预览槽位分配、复用、释放、TTL 回收
+
+## 工作流模板
+
+仓库提供 `.gitea/workflows` 示例,可直接接入:
+
+- `.gitea/workflows/issue-branch-preview.yml`
+- `.gitea/workflows/preview-slot-reclaim.yml`
+
+用于实现“分配槽位 + 按变更范围部署 + 自动回收”。
diff --git a/site/index.html b/site/index.html
new file mode 100644
index 0000000..f3f543f
--- /dev/null
+++ b/site/index.html
@@ -0,0 +1,204 @@
+
+
+
+
+
+ Gitea Issue DevOps Agent
+
+
+
+
+
+ Gitea Issue DevOps Agent
+
+ 一个把 Issue → Branch → Preview Env → 测试闭环 固化到技能与脚本中的交付方案。
+ 核心目标是提升交付速度,同时避免“每个分支都全量起服务”的资源浪费。
+
+
+ 自动 / 半自动 / 全人工
+ Issue 图片证据抓取
+ 变更范围智能部署
+ 槽位池自动回收
+
+
+
+
+ 核心价值
+
+
+ 1. 分支隔离提测
+ 每个 issue 绑定分支与预览槽位,主干环境保持稳定,避免相互覆盖。
+
+
+ 2. 资源按需分配
+ 根据变更范围判断 client_only/server_only/full_stack,不变更服务端就不重启服务端。
+
+
+ 3. 可审计闭环
+ 每次提测都可回溯到 commit、测试结果、环境 URL、验证步骤,且合并始终由工程师人工确认。
+
+
+
+
+
+ 安装指南
+ 1) 获取技能仓库
+ git clone https://fun-md.com/Fun_MD/devops-skills.git
+cd devops-skills
+ 2) 安装到 Codex skills
+ # Linux / macOS
+mkdir -p ~/.codex/skills
+cp -r skills/gitea-issue-devops-agent ~/.codex/skills/
+
+# Windows PowerShell
+New-Item -ItemType Directory -Force $HOME\.codex\skills | Out-Null
+Copy-Item .\skills\gitea-issue-devops-agent $HOME\.codex\skills\gitea-issue-devops-agent -Recurse -Force
+ 3) 首次引导参数
+
+ repo_url(仓库地址)
+ api_key(具备 issue 读写权限)
+ mode:automatic / semi-automatic / manual
+ - 可选:
reviewers、test_entry、deploy_env、health_endpoint、min_quality_score
+
+
+
+
+ 工具使用说明
+ issue_audit.py(拉取 issue + 图片证据)
+ python skills/gitea-issue-devops-agent/scripts/issue_audit.py \
+ --base-url https://fun-md.com \
+ --repo FunMD/document-collab \
+ --token <GITEA_TOKEN> \
+ --state all \
+ --download-attachments \
+ --output-dir .tmp/issue-audit
+
+ change_scope.py(按改动范围决策部署)
+ python skills/gitea-issue-devops-agent/scripts/change_scope.py \
+ --repo-path . \
+ --base-ref origin/main \
+ --head-ref HEAD
+
+ preview_slot_allocator.py(分配 / 复用 / 释放槽位)
+ python skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py \
+ --state-file .tmp/preview-slots.json \
+ --slots preview-a,preview-b \
+ --repo FunMD/document-collab \
+ --issue 48 \
+ --branch dev \
+ --ttl-hours 24 \
+ --url-template https://{slot}.qa.example.com \
+ --evict-oldest
+
+
+
+ .gitea/workflows 接入
+
+ 本仓库已包含示例工作流:.gitea/workflows/issue-branch-preview.yml 与
+ .gitea/workflows/preview-slot-reclaim.yml,用于完成以下自动化链路:
+
+
+ - push 到 issue 分支后:自动分配槽位 + 变更范围识别 + 选择性部署
+ - issue 关闭 / 定时任务:自动释放或回收过期槽位
+
+ 建议先在测试仓库验证工作流变量后再推广到生产仓库。
+
+
+
+
+
+
diff --git a/skills/gitea-issue-devops-agent/SKILL.md b/skills/gitea-issue-devops-agent/SKILL.md
new file mode 100644
index 0000000..d7a347e
--- /dev/null
+++ b/skills/gitea-issue-devops-agent/SKILL.md
@@ -0,0 +1,234 @@
+---
+name: gitea-issue-devops-agent
+description: End-to-end Gitea issue delivery workflow with guided onboarding, branch-scoped preview environments, and resource-aware deployment decisions. Use when tasks involve connecting to Gitea, processing text/image issues, fixing code on issue-specified branches, allocating/reusing test environments per branch, running test submission loops, coordinating review approvals, and closing issues only after verified delivery and engineer-confirmed merge.
+---
+
+# Gitea Issue DevOps Agent
+
+## Mandatory Guided Start
+
+Run this interaction before any coding or issue action:
+
+1. Ask for repository address:
+ - preferred: full URL `https:////`
+ - fallback: `base_url` + `owner/repo`
+2. Ask for API key/token with issue read/write permissions.
+3. Ask user to select mode:
+ - `automatic`
+ - `semi-automatic`
+ - `manual` (non-automatic)
+4. Ask optional defaults:
+ - designated reviewers (for semi-automatic mode)
+ - branch test submission entrypoint (CI command/job)
+ - environment policy:
+ - stable main URL (`main` fixed test env)
+ - optional shared QA URL
+ - preview slot pool (for issue branches), e.g. `preview-a,preview-b`
+ - preview URL template, e.g. `https://{slot}.qa.example.com`
+ - deployment environment + health endpoint
+ - minimum issue quality score (default `70`)
+5. Validate connectivity by running:
+ - `python scripts/issue_audit.py --repo --base-url --token --state all --download-attachments --output-dir .tmp/issue-audit`
+6. Initialize preview-slot state (if branch previews enabled):
+ - `python scripts/preview_slot_allocator.py --state-file .tmp/preview-slots.json --slots --list`
+7. Echo back the selected mode and all gate rules, then start work.
+
+If repository or token is missing/invalid, stop and request correction. Never start development without a successful connectivity check.
+
+## Mode Definitions
+
+### 1) Automatic Mode
+
+- Read issue-specified branch and work on that branch.
+- Implement fix, run checks, push branch, allocate/reuse branch preview env, and trigger branch test submission automatically.
+- Monitor test results and issue feedback, then iterate on the same branch until pass.
+- Close issue only after evidence is complete.
+- Merge is still blocked until an engineer explicitly confirms merge approval.
+
+### 2) Semi-Automatic Mode
+
+- Read issue-specified branch and work on that branch.
+- Implement and push fix.
+- Notify designated reviewer with change summary, risk, and test plan.
+- Wait for explicit human review approval.
+- After approval, allocate/reuse branch preview env, trigger branch test submission and continue loop.
+- Close issue only after evidence is complete.
+- Merge is still blocked until an engineer explicitly confirms merge approval.
+
+### 3) Manual Mode (Non-Automatic)
+
+Require explicit human confirmation before each major action:
+
+- selecting issue
+- confirming target branch
+- applying code changes
+- pushing commits
+- triggering tests/deploy
+- closing/reopening issue
+- executing merge
+
+No autonomous transition is allowed in manual mode.
+
+## Branch-First Rules
+
+- Treat issue-declared branch as the source of truth.
+- Accept branch hints from issue fields/body/comments (example: `branch: feat/login-fix`).
+- If branch is missing or ambiguous, ask user/reporter and pause that issue.
+- Do not silently switch branches.
+- Keep one active issue per branch unless user explicitly approves batching.
+
+## Environment Model (Required)
+
+Always avoid `main` and issue branches overwriting each other.
+
+1. `main` fixed env (stable):
+ - one permanent URL for regression/baseline testing
+2. optional shared QA env:
+ - integration testing across multiple completed branches
+3. issue preview slot env (ephemeral pool):
+ - small fixed pool (`N` slots, e.g. 2)
+ - one active branch binds to one slot
+ - issue comments must include slot + URL + branch
+ - close/merge/TTL expiry releases slot
+
+Never deploy different branches to the same fixed URL unless user explicitly approves override.
+
+## Issue -> Branch -> Environment Binding
+
+- Binding key: `##`
+- Environment selection:
+ - if branch already has assigned slot: reuse same slot
+ - else allocate free slot from pool
+ - if no free slot:
+ - in `automatic`: evict oldest expired/inactive slot if policy allows
+ - in `semi-automatic` / `manual`: request explicit confirmation before eviction
+- Persist slot state in `.tmp/preview-slots.json` via `scripts/preview_slot_allocator.py`
+
+## Resource-Aware Deployment Strategy (Required)
+
+Before every branch test submission, detect change scope:
+
+- `python scripts/change_scope.py --repo-path --base-ref --head-ref `
+
+Use the scope result to minimize resource usage:
+
+1. `skip` (docs/tests/chore-only):
+ - do not deploy
+ - post no-op verification evidence
+2. `client_only`:
+ - build/deploy client only
+ - reuse existing shared/stable server
+ - do not start a dedicated server for this branch
+3. `server_only`:
+ - deploy/restart server only
+ - keep existing client if unchanged
+4. `full_stack`:
+ - deploy both client and server
+5. `infra_only`:
+ - apply infra/workflow changes; restart only required components
+
+Hard rule:
+- If server-related scope is unchanged, do not provision/restart dedicated server processes for that issue branch.
+
+## Standard Workflow (All Modes)
+
+### 1) Intake and Prioritization
+
+- Pull issues, comments, and attachments from Gitea API.
+- If issue text/comments indicate image evidence but `attachments_downloaded` is `0`, stop and report image-intake failure before coding.
+- Prioritize in this order:
+ - `closed_but_unresolved`
+ - `open` + `quality_score >= min_quality_score`
+ - `open` + `quality_score < min_quality_score` (request details first)
+ - `closed_open_reopen_candidates`
+- For issues with images, inspect attachments before coding.
+
+### 2) Deduplication and Quality Gate
+
+- Group issues by semantic intent, not literal wording.
+- Keep one parent issue for implementation.
+- Use `references/triage-standard.md` for score and comment templates.
+- For low-quality issues, request details and mark as `needs-info`.
+
+### 3) Fix Execution
+
+- Prefer small, reversible patches.
+- Link every code change to issue ID in commit or PR/MR notes.
+- Split cross-cutting work into incremental commits.
+
+### 4) Verification Gate
+
+- Required:
+ - build/compile passes
+ - affected unit/integration tests pass
+ - smoke path for reported scenario passes
+- For UI/image issues:
+ - compare before/after screenshots
+ - verify in at least one Chromium browser
+
+### 5) Branch Test Submission ("提测")
+
+- Submit testing on the issue branch (CI pipeline + branch preview env).
+- Allocate/reuse branch slot before submission.
+- Apply resource-aware deployment decision from change scope.
+- Post evidence in issue comment:
+ - commit SHA
+ - test run URL and result
+ - environment/slot/URL
+ - deployment scope (`skip`/`client_only`/`server_only`/`full_stack`/`infra_only`)
+ - shared backend reused or dedicated backend started
+ - verification steps
+- If fail/reject, iterate on same branch and re-submit.
+
+### 6) Loop Control
+
+- Continue `fix -> test submission -> feedback -> fix` until done.
+- Reopen immediately if verification fails or regression appears.
+- Do not close based on title-only or assumption-only validation.
+
+### 7) Closure Rule
+
+Close issue only when all are true:
+
+- root cause identified
+- fix verified with reproducible evidence
+- test submission passed
+- closure comment includes commit/test/deploy evidence
+
+### 8) Merge Rule (Always Human-Confirmed)
+
+- Final merge must be approved by an engineer in all modes.
+- Agent can prepare merge notes/checklist, but must wait for explicit merge confirmation.
+- Merge only after confirmation, then post final release evidence.
+
+### 9) Environment Cleanup
+
+- On issue close/merge:
+ - release preview slot
+ - stop branch-only processes (if any)
+ - keep main/shared env untouched
+- On TTL expiry:
+ - reclaim idle slot automatically (automatic mode) or after confirmation (semi/manual)
+
+## Script Usage
+
+- `scripts/issue_audit.py`: collect issues/comments/attachments, detect duplicates, score quality, detect unresolved/closed-open links, extract issue branch hints, and generate reports.
+ - image intake uses three sources: markdown/html links, payload `assets/attachments` fields, and `/issues/*/assets` API endpoints.
+ - if your Gitea blocks the assets endpoints, pass `--skip-asset-endpoints` and rely on payload extraction.
+- `scripts/preview_slot_allocator.py`: allocate/reuse/release/list preview slots by issue+branch.
+ - allocate example:
+ - `python scripts/preview_slot_allocator.py --state-file .tmp/preview-slots.json --slots preview-a,preview-b --repo --issue 48 --branch dev --ttl-hours 24 --url-template https://{slot}.qa.example.com`
+ - release example:
+ - `python scripts/preview_slot_allocator.py --state-file .tmp/preview-slots.json --slots preview-a,preview-b --release --repo --issue 48 --branch dev`
+- `scripts/change_scope.py`: detect changed scope and recommend minimum deploy strategy.
+ - `python scripts/change_scope.py --repo-path --base-ref origin/main --head-ref HEAD`
+- `references/triage-standard.md`: scoring rubric and templates for needs-info, review request, test submission, and merge approval.
+
+## Operational Constraints
+
+- Never bulk-close issues without per-issue verification evidence.
+- Never ignore attachment images for UI/interaction issues.
+- Never merge feature requests and bugfixes into one untraceable commit.
+- Never bypass engineer merge confirmation.
+- Never allow branch previews to overwrite main stable env.
+- Never start dedicated branch server when scope indicates client-only changes.
diff --git a/skills/gitea-issue-devops-agent/agents/openai.yaml b/skills/gitea-issue-devops-agent/agents/openai.yaml
new file mode 100644
index 0000000..6394055
--- /dev/null
+++ b/skills/gitea-issue-devops-agent/agents/openai.yaml
@@ -0,0 +1,4 @@
+interface:
+ display_name: "Gitea Issue DevOps Agent"
+ short_description: "Guided Gitea issue delivery with execution modes, branch preview slots, and resource-aware deployments"
+ default_prompt: "Start with guided setup (repo URL, API key, mode, env policy), process issues on issue-specified branches, bind each branch to preview slots, decide deploy scope from git diff (skip/client-only/server-only/full-stack), avoid dedicated server restarts when backend is unchanged, run fix/test loops, and require engineer confirmation before final merge."
diff --git a/skills/gitea-issue-devops-agent/references/triage-standard.md b/skills/gitea-issue-devops-agent/references/triage-standard.md
new file mode 100644
index 0000000..62b37f1
--- /dev/null
+++ b/skills/gitea-issue-devops-agent/references/triage-standard.md
@@ -0,0 +1,140 @@
+# Triage Standard
+
+## Quality Score (0-100)
+
+- `+20` Expected vs actual is explicit.
+- `+20` Reproduction steps are explicit.
+- `+15` Environment is explicit (browser/device/version).
+- `+15` Attachment exists (image/video/log).
+- `+10` Title is specific enough to infer scope.
+- `+20` Body has enough detail for engineering action.
+
+`pass` = score `>= 70`.
+
+## Status Decision
+
+- `closed_but_unresolved`: issue is closed but has clear reporter feedback indicating “still not fixed”.
+- `closed_open_reopen_candidate`: issue is closed, but a newer open issue has high title/body similarity and should be manually reviewed for mis-close or regression.
+- `ready_for_fix`: open issue, quality pass, reproducible.
+- `needs-info`: open issue, quality below threshold.
+- `duplicate`: semantically same as another issue.
+- `enhancement-epic`: feature scope too large for a single fix cycle; split into sub-issues first.
+
+## Mandatory Review Gates
+
+- `image-first`: when issue body/comments contain screenshots, review the image evidence before coding.
+- `image-retrieval-proof`: include `attachment_urls_detected` and `attachments_downloaded` from audit summary to prove image intake worked.
+- `real-ai-only`: for AI conversation/generation issues, verify against real provider APIs with valid keys. Do not close based on mock-only behavior.
+- `closure-evidence`: close only with commit, test result, deploy proof, and verification path.
+
+## Needs-Info Comment Template
+
+Use this comment when score is below threshold:
+
+```text
+[issue-quality-feedback-v1]
+当前工单暂不满足开发准入标准,已进入待补充队列。请补充以下信息后我们将立即纳入修复流程:
+1) 复现步骤(至少 3 步)
+2) 期望结果 vs 实际结果
+3) 环境信息(浏览器/系统/时间)
+4) 截图或录屏(建议标注异常区域)
+```
+
+## Guided Start Template
+
+Use this at session start before any implementation:
+
+```text
+[devops-startup-check-v1]
+开始处理前请提供:
+1) 仓库地址(完整 URL 或 base_url + owner/repo)
+2) API Key(具备 issue 读写权限)
+3) 执行模式(三选一):
+ - automatic:自动修复+提测循环,最终合并仍需工程师确认
+ - semi-automatic:修复后先人工 review,再提测循环
+ - manual:全流程人工确认
+可选:指定 reviewer、提测命令、部署环境、健康检查地址。
+可选(推荐):主环境 URL、共享 QA URL、预览槽位池(如 preview-a/preview-b)和预览 URL 模板。
+```
+
+## Review Request Template (Semi-Automatic)
+
+```text
+[issue-review-request-v1]
+已完成本轮修复,等待指定 reviewer 确认后进入提测:
+- issue: #
+- branch:
+- commit:
+- change summary:
+- risk:
+- test plan:
+请回复“review-approved”或给出修改意见。
+```
+
+## Test Submission Template
+
+```text
+[issue-test-submit-v1]
+已按分支提测:
+- issue: #
+- branch:
+- commit:
+- pipeline/test run:
+- environment:
+- preview slot:
+- preview url:
+- deploy scope:
+- server strategy:
+- verify steps:
+如失败或结果不符合预期,将继续同分支迭代修复。
+```
+
+## Preview Slot Allocation Template
+
+```text
+[issue-preview-slot-v1]
+已分配提测环境(按 issue+branch 绑定):
+- issue: #
+- branch:
+- slot:
+- preview url:
+- ttl:
+说明:同一分支会复用该 slot;关闭/合并后自动释放。
+```
+
+## Preview Slot Release Template
+
+```text
+[issue-preview-release-v1]
+已释放提测环境:
+- issue: #
+- branch:
+- slot:
+- reason:
+```
+
+## Merge Approval Template
+
+```text
+[merge-approval-check-v1]
+准备执行最终合并,请工程师确认:
+- issue: #
+- branch:
+- target:
+- review status:
+- test status:
+- release evidence:
+请明确回复“merge-approved”后再执行合并。
+```
+
+## Closure Comment Template
+
+```text
+[issue-verified-close-v1]
+已修复并发布。
+- commit:
+- tests:
+- deploy:
+- verify:
+如仍可复现,请附最新截图和复现步骤,我们将立即 reopen。
+```
diff --git a/skills/gitea-issue-devops-agent/scripts/change_scope.py b/skills/gitea-issue-devops-agent/scripts/change_scope.py
new file mode 100644
index 0000000..ac7135c
--- /dev/null
+++ b/skills/gitea-issue-devops-agent/scripts/change_scope.py
@@ -0,0 +1,184 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import argparse
+import json
+import subprocess
+import sys
+from pathlib import Path
+from typing import Any
+
+DOC_EXTS = {".md", ".rst", ".txt", ".adoc"}
+TEST_MARKERS = ("/tests/", "/test/", "__tests__", ".spec.", ".test.")
+CLIENT_PREFIXES = ("client/", "frontend/", "web/", "ui/")
+SERVER_PREFIXES = ("server/", "backend/", "api/", "services/", "worker/")
+INFRA_PREFIXES = (
+ "deploy/",
+ ".gitea/workflows/",
+ ".github/workflows/",
+ "infra/",
+ "k8s/",
+ "helm/",
+ "nginx/",
+)
+SHARED_RUNTIME_FILES = {
+ "package.json",
+ "package-lock.json",
+ "pnpm-lock.yaml",
+ "yarn.lock",
+ "requirements.txt",
+ "pyproject.toml",
+ "poetry.lock",
+ "Dockerfile",
+ "docker-compose.yml",
+ "docker-compose.yaml",
+}
+
+
+def run_git(cwd: Path, *args: str) -> str:
+ result = subprocess.run(
+ ["git", *args],
+ cwd=str(cwd),
+ check=False,
+ capture_output=True,
+ text=True,
+ encoding="utf-8",
+ )
+ if result.returncode != 0:
+ raise RuntimeError(result.stderr.strip() or f"git {' '.join(args)} failed")
+ return result.stdout.strip()
+
+
+def normalize_path(raw: str) -> str:
+ return raw.replace("\\", "/").lstrip("./")
+
+
+def classify_file(path: str) -> set[str]:
+ categories: set[str] = set()
+ normalized = normalize_path(path)
+ lower = normalized.lower()
+ suffix = Path(lower).suffix
+
+ if lower.startswith("docs/") or suffix in DOC_EXTS:
+ categories.add("docs")
+ if any(marker in lower for marker in TEST_MARKERS):
+ categories.add("tests")
+ if any(lower.startswith(prefix) for prefix in CLIENT_PREFIXES):
+ categories.add("client")
+ if any(lower.startswith(prefix) for prefix in SERVER_PREFIXES):
+ categories.add("server")
+ if any(lower.startswith(prefix) for prefix in INFRA_PREFIXES):
+ categories.add("infra")
+ if Path(lower).name in SHARED_RUNTIME_FILES:
+ categories.add("shared_runtime")
+
+ if not categories:
+ categories.add("unknown")
+ return categories
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(
+ description="Detect branch change scope and recommend minimum deployment strategy."
+ )
+ parser.add_argument("--repo-path", default=".", help="Local git repository path")
+ parser.add_argument("--base-ref", required=True, help="Base ref, e.g. origin/main")
+ parser.add_argument("--head-ref", default="HEAD", help="Head ref, e.g. feature branch or SHA")
+ parser.add_argument("--no-files", action="store_true", help="Do not include changed file list in output")
+ args = parser.parse_args()
+
+ repo_path = Path(args.repo_path).resolve()
+
+ try:
+ merge_base = run_git(repo_path, "merge-base", args.base_ref, args.head_ref)
+ diff_output = run_git(repo_path, "diff", "--name-only", f"{merge_base}..{args.head_ref}")
+ except Exception as error: # noqa: BLE001
+ print(json.dumps({"error": str(error)}, ensure_ascii=False, indent=2))
+ sys.exit(2)
+
+ changed_files = [normalize_path(line) for line in diff_output.splitlines() if line.strip()]
+ file_classes: list[dict[str, Any]] = []
+ category_union: set[str] = set()
+ for path in changed_files:
+ categories = classify_file(path)
+ category_union.update(categories)
+ file_classes.append({"path": path, "categories": sorted(categories)})
+
+ has_docs = "docs" in category_union
+ has_tests = "tests" in category_union
+ has_client = "client" in category_union
+ has_server = "server" in category_union
+ has_infra = "infra" in category_union
+ has_shared_runtime = "shared_runtime" in category_union
+ has_unknown = "unknown" in category_union
+
+ only_docs_tests = bool(changed_files) and category_union.issubset({"docs", "tests"})
+ scope = "skip"
+ reasons: list[str] = []
+
+ if not changed_files:
+ scope = "skip"
+ reasons.append("no changed files")
+ elif only_docs_tests:
+ scope = "skip"
+ reasons.append("docs/tests-only changes")
+ else:
+ require_server = has_server or has_shared_runtime
+ require_client = has_client or has_shared_runtime
+
+ if has_unknown:
+ scope = "full_stack"
+ reasons.append("unknown file changes detected -> conservative full-stack deploy")
+ require_server = True
+ require_client = True
+ elif has_infra and not require_server and not require_client:
+ scope = "infra_only"
+ reasons.append("infra/workflow-only changes")
+ elif require_client and not require_server:
+ scope = "client_only"
+ reasons.append("client-only changes")
+ elif require_server and not require_client:
+ scope = "server_only"
+ reasons.append("server-only changes")
+ else:
+ scope = "full_stack"
+ reasons.append("client/server/shared runtime changes")
+
+ should_restart_server = scope in {"server_only", "full_stack"}
+ should_restart_client = scope in {"client_only", "full_stack"}
+
+ payload: dict[str, Any] = {
+ "repo_path": str(repo_path.as_posix()),
+ "base_ref": args.base_ref,
+ "head_ref": args.head_ref,
+ "merge_base": merge_base,
+ "changed_files_count": len(changed_files),
+ "scope": scope,
+ "categories": {
+ "docs": has_docs,
+ "tests": has_tests,
+ "client": has_client,
+ "server": has_server,
+ "infra": has_infra,
+ "shared_runtime": has_shared_runtime,
+ "unknown": has_unknown,
+ },
+ "only_docs_tests": only_docs_tests,
+ "actions": {
+ "skip_deploy": scope == "skip",
+ "should_restart_client": should_restart_client,
+ "should_restart_server": should_restart_server,
+ "reuse_shared_server": not should_restart_server,
+ "requires_dedicated_server": should_restart_server,
+ },
+ "reasons": reasons,
+ }
+ if not args.no_files:
+ payload["changed_files"] = changed_files
+ payload["file_classification"] = file_classes
+
+ print(json.dumps(payload, ensure_ascii=False, indent=2))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/skills/gitea-issue-devops-agent/scripts/issue_audit.py b/skills/gitea-issue-devops-agent/scripts/issue_audit.py
new file mode 100644
index 0000000..033f376
--- /dev/null
+++ b/skills/gitea-issue-devops-agent/scripts/issue_audit.py
@@ -0,0 +1,873 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import argparse
+import html
+import hashlib
+import json
+import re
+from dataclasses import dataclass
+from datetime import datetime, timezone
+from difflib import SequenceMatcher
+from pathlib import Path
+from typing import Any
+from urllib.error import HTTPError, URLError
+from urllib.parse import urlencode, urljoin, urlparse
+from urllib.request import Request, urlopen
+
+IMG_MD_RE = re.compile(
+ r"!\[[^\]]*\]\(\s*([^\s>)]+)(?:\s+[\"'][^\"']*[\"'])?\s*\)"
+)
+IMG_HTML_RE = re.compile(r"
]+src=[\"']([^\"']+)[\"']", re.IGNORECASE)
+IMG_URL_RE = re.compile(r"(https?://[^\s)]+?\.(?:png|jpg|jpeg|gif|webp|svg))", re.IGNORECASE)
+ATTACHMENT_PATH_RE = re.compile(
+ r"((?:https?://[^\s)\"'>]+)?/(?:attachments|repo-attachments|api/v1/repos/[^\s)\"'>]+/issues(?:/comments)?/\d+/assets/\d+)[^\s)\"'>]*)",
+ re.IGNORECASE,
+)
+UNRESOLVED_KEYWORDS = (
+ "未修复",
+ "没有修复",
+ "问题还在",
+ "依旧",
+ "仍然",
+ "还是",
+ "无法",
+ "没解决",
+ "still not fixed",
+ "not fixed",
+ "cannot reproduce? no",
+ "failed",
+ "broken",
+)
+QUALITY_MARKER = "[issue-quality-feedback-v1]"
+BRANCH_LABEL_RE = re.compile(
+ r"(?:^|[\r\n])\s*(?:branch|target branch|working branch|fix branch|分支|目标分支)\s*[::=]\s*`?([A-Za-z0-9._/\-]+)`?",
+ re.IGNORECASE,
+)
+BRANCH_INLINE_RE = re.compile(
+ r"(?:^|[\s,;])(?:/branch|branch)\s+`?([A-Za-z0-9._/\-]+)`?",
+ re.IGNORECASE,
+)
+BRANCH_ALLOWED_RE = re.compile(r"^[A-Za-z0-9._/\-]+$")
+
+
+@dataclass
+class IssueEntry:
+ number: int
+ state: str
+ title: str
+ body: str
+ created_at: str
+ updated_at: str
+ closed_at: str | None
+ comments: list[dict[str, Any]]
+ attachments: list[str]
+ quality_score: int
+ target_branch: str | None
+
+ def brief(self) -> dict[str, Any]:
+ return {
+ "number": self.number,
+ "state": self.state,
+ "title": self.title,
+ "quality_score": self.quality_score,
+ "target_branch": self.target_branch,
+ "attachments": len(self.attachments),
+ "created_at": self.created_at,
+ "updated_at": self.updated_at,
+ "closed_at": self.closed_at,
+ }
+
+
+def _to_datetime(value: str | None) -> datetime | None:
+ raw = (value or "").strip()
+ if not raw:
+ return None
+ try:
+ return datetime.fromisoformat(raw.replace("Z", "+00:00"))
+ except ValueError:
+ return None
+
+
+def _request_json(
+ base_url: str,
+ token: str,
+ path: str,
+ query: dict[str, Any] | None = None,
+ method: str = "GET",
+ body: dict[str, Any] | None = None,
+) -> Any:
+ query_str = f"?{urlencode(query)}" if query else ""
+ url = f"{base_url.rstrip('/')}{path}{query_str}"
+ payload = None if body is None else json.dumps(body).encode("utf-8")
+ req = Request(
+ url,
+ method=method,
+ headers={
+ "Authorization": f"token {token}",
+ "Content-Type": "application/json",
+ "Accept": "application/json",
+ },
+ data=payload,
+ )
+ with urlopen(req, timeout=30) as resp:
+ raw = resp.read().decode("utf-8")
+ return json.loads(raw) if raw else None
+
+
+def _request_binary(url: str, token: str) -> tuple[bytes, str | None]:
+ header_candidates = (
+ {"Authorization": f"token {token}"},
+ {"Authorization": f"Bearer {token}"},
+ {"X-Gitea-Token": token},
+ {"Authorization": f"token {token}", "X-Gitea-Token": token},
+ )
+ last_error: Exception | None = None
+ for auth_headers in header_candidates:
+ req = Request(
+ url,
+ method="GET",
+ headers={
+ "Accept": "*/*",
+ **auth_headers,
+ },
+ )
+ try:
+ with urlopen(req, timeout=30) as resp:
+ content = resp.read()
+ content_type = resp.headers.get("Content-Type")
+ return content, content_type
+ except HTTPError as error:
+ last_error = error
+ if error.code in {401, 403}:
+ continue
+ raise
+ except URLError as error:
+ last_error = error
+ continue
+ if last_error is not None:
+ raise last_error
+ raise RuntimeError("failed to download attachment")
+
+
+def _normalize_url(raw_url: str, base_url: str) -> str | None:
+ candidate = html.unescape(str(raw_url or "").strip())
+ if not candidate:
+ return None
+ candidate = candidate.strip("<>\"'")
+ if not candidate:
+ return None
+ if candidate.startswith("//"):
+ base_scheme = urlparse(base_url).scheme or "https"
+ candidate = f"{base_scheme}:{candidate}"
+ if candidate.startswith("http://") or candidate.startswith("https://"):
+ return candidate
+ return urljoin(f"{base_url.rstrip('/')}/", candidate)
+
+
+def _asset_to_urls(asset: dict[str, Any], base_url: str) -> list[str]:
+ urls: list[str] = []
+ for key in ("browser_download_url", "download_url", "url", "href", "link"):
+ normalized = _normalize_url(str(asset.get(key) or ""), base_url)
+ if normalized and normalized not in urls:
+ urls.append(normalized)
+ uuid_value = str(asset.get("uuid") or "").strip()
+ if uuid_value:
+ fallback = _normalize_url(f"/attachments/{uuid_value}", base_url)
+ if fallback and fallback not in urls:
+ urls.append(fallback)
+ return urls
+
+
+def _extract_asset_urls(payload: dict[str, Any], base_url: str) -> list[str]:
+ results: list[str] = []
+ for key in ("assets", "attachments"):
+ assets = payload.get(key) or []
+ if not isinstance(assets, list):
+ continue
+ for asset in assets:
+ if not isinstance(asset, dict):
+ continue
+ for url in _asset_to_urls(asset, base_url):
+ if url not in results:
+ results.append(url)
+ return results
+
+
+def _request_json_optional(
+ *,
+ base_url: str,
+ token: str,
+ path: str,
+ query: dict[str, Any] | None = None,
+) -> Any | None:
+ try:
+ return _request_json(base_url, token, path, query=query)
+ except HTTPError as error:
+ if error.code in {401, 403, 404, 405}:
+ return None
+ raise
+ except URLError:
+ return None
+
+
+def _list_asset_urls_from_endpoint(
+ *,
+ base_url: str,
+ token: str,
+ path: str,
+) -> list[str]:
+ urls: list[str] = []
+ page = 1
+ while True:
+ payload = _request_json_optional(
+ base_url=base_url,
+ token=token,
+ path=path,
+ query={"limit": 50, "page": page},
+ )
+ if payload is None:
+ break
+ if not isinstance(payload, list) or not payload:
+ break
+ for asset in payload:
+ if not isinstance(asset, dict):
+ continue
+ for url in _asset_to_urls(asset, base_url):
+ if url not in urls:
+ urls.append(url)
+ if len(payload) < 50:
+ break
+ page += 1
+ return urls
+
+
+def _list_issue_attachment_urls(
+ *,
+ base_url: str,
+ api_root: str,
+ token: str,
+ issue_number: int,
+) -> list[str]:
+ return _list_asset_urls_from_endpoint(
+ base_url=base_url,
+ token=token,
+ path=f"{api_root}/issues/{issue_number}/assets",
+ )
+
+
+def _list_comment_attachment_urls(
+ *,
+ base_url: str,
+ api_root: str,
+ token: str,
+ comment_id: int,
+) -> list[str]:
+ return _list_asset_urls_from_endpoint(
+ base_url=base_url,
+ token=token,
+ path=f"{api_root}/issues/comments/{comment_id}/assets",
+ )
+
+
+def _extract_attachments(text: str, base_url: str) -> list[str]:
+ if not text:
+ return []
+ urls = [
+ *IMG_MD_RE.findall(text),
+ *IMG_HTML_RE.findall(text),
+ *IMG_URL_RE.findall(text),
+ *ATTACHMENT_PATH_RE.findall(text),
+ ]
+ normalized: list[str] = []
+ for url in urls:
+ cleaned = _normalize_url(str(url), base_url)
+ if cleaned:
+ normalized.append(cleaned)
+ return sorted(set(normalized))
+
+
+def _normalize_branch_name(raw_value: str) -> str | None:
+ candidate = str(raw_value or "").strip().strip("`'\"")
+ candidate = re.sub(r"[),.;]+$", "", candidate)
+ if not candidate:
+ return None
+ if len(candidate) > 160:
+ return None
+ if not BRANCH_ALLOWED_RE.fullmatch(candidate):
+ return None
+ return candidate
+
+
+def _extract_branch_hints(text: str) -> list[str]:
+ if not text:
+ return []
+ results: list[str] = []
+ for regex in (BRANCH_LABEL_RE, BRANCH_INLINE_RE):
+ for match in regex.findall(text):
+ branch = _normalize_branch_name(match)
+ if branch and branch not in results:
+ results.append(branch)
+ return results
+
+
+def _pick_issue_branch(body: str, comments: list[dict[str, Any]]) -> str | None:
+ for branch in _extract_branch_hints(body):
+ return branch
+ for comment in reversed(comments):
+ for branch in _extract_branch_hints(str(comment.get("body") or "")):
+ return branch
+ return None
+
+
+def _normalize_for_similarity(text: str) -> str:
+ lowered = text.lower()
+ lowered = re.sub(r"[`*_>#~=\[\](){}:;,.!?/\\|+-]+", " ", lowered)
+ lowered = re.sub(r"\s+", " ", lowered).strip()
+ return lowered
+
+
+def _quality_score(issue: dict[str, Any], attachments: list[str], comments: list[dict[str, Any]]) -> int:
+ title = str(issue.get("title") or "")
+ body = str(issue.get("body") or "")
+ comment_blob = "\n".join(str(item.get("body") or "") for item in comments[:5])
+ text = f"{title}\n{body}\n{comment_blob}"
+
+ score = 0
+ if re.search(
+ r"(期望|expected).{0,24}(实际|actual)|(实际|actual).{0,24}(期望|expected)",
+ text,
+ re.I | re.S,
+ ):
+ score += 20
+ if re.search(r"(复现|步骤|step|how to reproduce|重现)", text, re.I):
+ score += 20
+ if re.search(r"(浏览器|browser|系统|os|版本|version|设备|device|时间)", text, re.I):
+ score += 15
+ if attachments:
+ score += 15
+ if len(title.strip()) >= 6:
+ score += 10
+ if len(re.sub(r"\s+", "", body)) >= 40:
+ score += 20
+ return min(100, score)
+
+
+def _contains_unresolved_feedback(comments: list[dict[str, Any]]) -> bool:
+ for comment in comments:
+ body = str(comment.get("body") or "").lower()
+ if any(keyword in body for keyword in UNRESOLVED_KEYWORDS):
+ return True
+ return False
+
+
+def _issue_similarity(left: IssueEntry, right: IssueEntry) -> float:
+ lhs = _normalize_for_similarity(f"{left.title} {left.body[:700]}")
+ rhs = _normalize_for_similarity(f"{right.title} {right.body[:700]}")
+ if not lhs or not rhs:
+ return 0.0
+ return SequenceMatcher(None, lhs, rhs).ratio()
+
+
+def _title_ngrams(title: str) -> set[str]:
+ normalized = re.sub(r"\s+", "", title.lower())
+ normalized = re.sub(r"[^a-z0-9\u4e00-\u9fff]", "", normalized)
+ grams: set[str] = set()
+ for size in (2, 3):
+ for idx in range(len(normalized) - size + 1):
+ gram = normalized[idx : idx + size]
+ if not gram or gram.isdigit():
+ continue
+ grams.add(gram)
+ return grams
+
+
+def _build_duplicate_groups(entries: list[IssueEntry], threshold: float) -> list[list[int]]:
+ if not entries:
+ return []
+
+ pairs: list[tuple[int, int]] = []
+ numbers = [item.number for item in entries]
+ for i in range(len(entries)):
+ for j in range(i + 1, len(entries)):
+ ratio = _issue_similarity(entries[i], entries[j])
+ if ratio >= threshold:
+ pairs.append((entries[i].number, entries[j].number))
+
+ groups: list[list[int]] = []
+ seen: set[int] = set()
+ graph: dict[int, set[int]] = {}
+ for a, b in pairs:
+ graph.setdefault(a, set()).add(b)
+ graph.setdefault(b, set()).add(a)
+
+ for number in numbers:
+ if number in seen or number not in graph:
+ continue
+ stack = [number]
+ group: list[int] = []
+ while stack:
+ node = stack.pop()
+ if node in seen:
+ continue
+ seen.add(node)
+ group.append(node)
+ stack.extend(graph.get(node, set()))
+ if len(group) > 1:
+ groups.append(sorted(group))
+ return sorted(groups, key=lambda item: item[0])
+
+
+def _build_closed_open_links(
+ closed_entries: list[IssueEntry],
+ open_entries: list[IssueEntry],
+ threshold: float,
+ min_title_ngram_overlap: int,
+) -> list[dict[str, Any]]:
+ links: list[dict[str, Any]] = []
+ for closed_issue in closed_entries:
+ closed_at = _to_datetime(closed_issue.closed_at) or _to_datetime(closed_issue.updated_at)
+ if not closed_at:
+ continue
+
+ best_open: IssueEntry | None = None
+ best_ratio = 0.0
+ best_overlap = 0
+ closed_grams = _title_ngrams(closed_issue.title)
+ for open_issue in open_entries:
+ open_created = _to_datetime(open_issue.created_at)
+ if open_created and open_created < closed_at:
+ continue
+ ratio = _issue_similarity(closed_issue, open_issue)
+ overlap = len(closed_grams & _title_ngrams(open_issue.title))
+ if ratio > best_ratio or (ratio == best_ratio and overlap > best_overlap):
+ best_ratio = ratio
+ best_overlap = overlap
+ best_open = open_issue
+
+ if (
+ best_open
+ and best_ratio >= threshold
+ and best_overlap >= max(1, min_title_ngram_overlap)
+ ):
+ links.append(
+ {
+ "closed_issue": closed_issue.number,
+ "open_issue": best_open.number,
+ "similarity": round(best_ratio, 4),
+ "title_ngram_overlap": best_overlap,
+ }
+ )
+ return sorted(links, key=lambda item: item["closed_issue"])
+
+
+def _load_issues(
+ base_url: str,
+ api_root: str,
+ token: str,
+ state: str,
+ *,
+ fetch_asset_endpoints: bool,
+) -> list[IssueEntry]:
+ states = ["open", "closed"] if state == "all" else [state]
+ collected: list[IssueEntry] = []
+ for target_state in states:
+ page = 1
+ while True:
+ issues = _request_json(
+ base_url,
+ token,
+ f"{api_root}/issues",
+ query={"state": target_state, "limit": 50, "page": page},
+ )
+ if not issues:
+ break
+ for issue in issues:
+ number = int(issue["number"])
+ body = str(issue.get("body") or "")
+ comments = _request_json(
+ base_url,
+ token,
+ f"{api_root}/issues/{number}/comments",
+ query={"limit": 100},
+ )
+ comments = comments or []
+ attachments = _extract_attachments(body, base_url)
+ attachments.extend(_extract_asset_urls(issue, base_url))
+ if fetch_asset_endpoints:
+ attachments.extend(
+ _list_issue_attachment_urls(
+ base_url=base_url,
+ api_root=api_root,
+ token=token,
+ issue_number=number,
+ )
+ )
+ for comment in comments:
+ attachments.extend(_extract_attachments(str(comment.get("body") or ""), base_url))
+ attachments.extend(_extract_asset_urls(comment, base_url))
+ comment_id_raw = comment.get("id")
+ comment_id = (
+ comment_id_raw
+ if isinstance(comment_id_raw, int)
+ else int(comment_id_raw)
+ if isinstance(comment_id_raw, str) and comment_id_raw.isdigit()
+ else None
+ )
+ if fetch_asset_endpoints and comment_id is not None:
+ attachments.extend(
+ _list_comment_attachment_urls(
+ base_url=base_url,
+ api_root=api_root,
+ token=token,
+ comment_id=comment_id,
+ )
+ )
+ attachments = sorted(set(attachments))
+ collected.append(
+ IssueEntry(
+ number=number,
+ state=str(issue.get("state") or target_state),
+ title=str(issue.get("title") or ""),
+ body=body,
+ created_at=str(issue.get("created_at") or ""),
+ updated_at=str(issue.get("updated_at") or ""),
+ closed_at=issue.get("closed_at"),
+ comments=comments,
+ attachments=attachments,
+ quality_score=_quality_score(issue, attachments, comments),
+ target_branch=_pick_issue_branch(body, comments),
+ )
+ )
+ if len(issues) < 50:
+ break
+ page += 1
+ return sorted(collected, key=lambda item: item.number)
+
+
+def _needs_quality_feedback(issue: IssueEntry, min_score: int) -> bool:
+ if issue.state != "open" or issue.quality_score >= min_score:
+ return False
+ for comment in issue.comments:
+ if QUALITY_MARKER in str(comment.get("body") or ""):
+ return False
+ return True
+
+
+def _quality_feedback_message() -> str:
+ return (
+ f"{QUALITY_MARKER}\n"
+ "当前工单暂不满足开发准入标准,已进入待补充队列。请补充以下信息后我们将立即纳入修复流程:\n"
+ "1) 复现步骤(至少 3 步)\n"
+ "2) 期望结果 vs 实际结果\n"
+ "3) 环境信息(浏览器/系统/时间)\n"
+ "4) 截图或录屏(建议标注异常区域)"
+ )
+
+
+def _pick_ext_from_url_or_mime(url: str, content_type: str | None) -> str:
+ parsed = urlparse(url)
+ suffix = Path(parsed.path).suffix.lower().strip()
+ if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}:
+ return suffix
+
+ normalized = (content_type or "").lower()
+ if "png" in normalized:
+ return ".png"
+ if "jpeg" in normalized or "jpg" in normalized:
+ return ".jpg"
+ if "gif" in normalized:
+ return ".gif"
+ if "webp" in normalized:
+ return ".webp"
+ if "svg" in normalized:
+ return ".svg"
+ return ".bin"
+
+
+def _download_attachments(
+ *,
+ entries: list[IssueEntry],
+ token: str,
+ output_dir: Path,
+ max_per_issue: int,
+) -> list[dict[str, Any]]:
+ output_dir.mkdir(parents=True, exist_ok=True)
+ downloaded: list[dict[str, Any]] = []
+ seen_hashes: set[str] = set()
+
+ for issue in entries:
+ if not issue.attachments:
+ continue
+ for idx, url in enumerate(issue.attachments[:max_per_issue], start=1):
+ digest = hashlib.sha1(url.encode("utf-8")).hexdigest()
+ if digest in seen_hashes:
+ continue
+ seen_hashes.add(digest)
+ try:
+ blob, content_type = _request_binary(url, token)
+ ext = _pick_ext_from_url_or_mime(url, content_type)
+ file_name = f"issue-{issue.number}-{idx}-{digest[:8]}{ext}"
+ local_path = output_dir / file_name
+ local_path.write_bytes(blob)
+ downloaded.append(
+ {
+ "issue": issue.number,
+ "url": url,
+ "path": str(local_path.as_posix()),
+ "size_bytes": len(blob),
+ "content_type": content_type or "",
+ "status": "ok",
+ }
+ )
+ except Exception as error: # noqa: BLE001
+ downloaded.append(
+ {
+ "issue": issue.number,
+ "url": url,
+ "path": "",
+ "size_bytes": 0,
+ "content_type": "",
+ "status": "failed",
+ "error": str(error),
+ }
+ )
+ return downloaded
+
+
+def _render_report(
+ output_path: Path,
+ *,
+ unresolved_closed: list[dict[str, Any]],
+ low_quality_open: list[IssueEntry],
+ duplicate_groups: list[list[int]],
+ closed_open_links: list[dict[str, Any]],
+ downloaded_attachments: list[dict[str, Any]],
+ all_entries: list[IssueEntry],
+) -> None:
+ lines: list[str] = []
+ lines.append("# Issue Audit Report")
+ lines.append("")
+ lines.append(f"- total issues: {len(all_entries)}")
+ lines.append(f"- closed_but_unresolved: {len(unresolved_closed)}")
+ lines.append(f"- open_low_quality: {len(low_quality_open)}")
+ lines.append(f"- duplicate_groups: {len(duplicate_groups)}")
+ lines.append(f"- closed_open_reopen_candidates: {len(closed_open_links)}")
+ issues_with_attachments = [item for item in all_entries if item.attachments]
+ lines.append(f"- issues_with_attachments: {len(issues_with_attachments)}")
+ lines.append(
+ f"- attachment_urls_detected: {sum(len(item.attachments) for item in issues_with_attachments)}"
+ )
+ open_entries = [item for item in all_entries if item.state == "open"]
+ open_with_branch = [item for item in open_entries if item.target_branch]
+ lines.append(f"- open_with_branch_hint: {len(open_with_branch)}/{len(open_entries)}")
+ if downloaded_attachments:
+ ok_count = sum(1 for item in downloaded_attachments if item["status"] == "ok")
+ failed_count = sum(1 for item in downloaded_attachments if item["status"] != "ok")
+ lines.append(f"- attachments_downloaded: {ok_count}/{len(downloaded_attachments)}")
+ lines.append(f"- attachments_download_failed: {failed_count}")
+ lines.append("")
+
+ lines.append("## Closed But Unresolved")
+ if not unresolved_closed:
+ lines.append("- none")
+ else:
+ for item in unresolved_closed:
+ lines.append(
+ f"- #{item['number']} {item['title']} (reason={item['reason']}, related_open={item.get('related_open')}, similarity={item.get('similarity')})"
+ )
+ lines.append("")
+
+ lines.append("## Closed/Open Regression Candidates")
+ if not closed_open_links:
+ lines.append("- none")
+ else:
+ for item in closed_open_links:
+ lines.append(
+ f"- closed #{item['closed_issue']} -> open #{item['open_issue']} (similarity={item['similarity']}, title_overlap={item['title_ngram_overlap']})"
+ )
+ lines.append("")
+
+ lines.append("## Open Low Quality")
+ if not low_quality_open:
+ lines.append("- none")
+ else:
+ for issue in low_quality_open:
+ lines.append(
+ f"- #{issue.number} {issue.title} (score={issue.quality_score}, branch={issue.target_branch or 'missing'}, attachments={len(issue.attachments)})"
+ )
+ lines.append("")
+
+ lines.append("## Open Issue Branch Mapping")
+ if not open_entries:
+ lines.append("- none")
+ else:
+ for issue in open_entries:
+ lines.append(f"- #{issue.number} -> {issue.target_branch or 'missing'}")
+ lines.append("")
+
+ lines.append("## Duplicate Groups (Open)")
+ if not duplicate_groups:
+ lines.append("- none")
+ else:
+ for group in duplicate_groups:
+ lines.append(f"- {', '.join(f'#{num}' for num in group)}")
+ lines.append("")
+
+ if downloaded_attachments:
+ lines.append("## Attachment Download Manifest")
+ for item in downloaded_attachments:
+ lines.append(
+ f"- issue #{item['issue']}: {item['status']} -> {item['path'] or item['url']}"
+ )
+ lines.append("")
+
+ output_path.write_text("\n".join(lines), encoding="utf-8")
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description="Audit Gitea issues for delivery workflow.")
+ parser.add_argument("--base-url", required=True, help="Gitea host, e.g. https://fun-md.com")
+ parser.add_argument("--repo", required=True, help="owner/repo")
+ parser.add_argument("--token", required=True, help="Gitea API token")
+ parser.add_argument("--state", default="all", choices=["open", "closed", "all"])
+ parser.add_argument("--output-dir", default=".tmp/issue-audit")
+ parser.add_argument("--min-quality-score", type=int, default=70)
+ parser.add_argument("--dedupe-threshold", type=float, default=0.62)
+ parser.add_argument("--reopen-similarity-threshold", type=float, default=0.27)
+ parser.add_argument(
+ "--reopen-title-overlap",
+ type=int,
+ default=2,
+ help="Minimum 2/3-char title n-gram overlap for closed/open regression candidates.",
+ )
+ parser.add_argument(
+ "--post-quality-feedback",
+ action="store_true",
+ help="Post needs-info comment for low quality open issues.",
+ )
+ parser.add_argument(
+ "--download-attachments",
+ action="store_true",
+ help="Download image attachments to output-dir/attachments for manual visual review.",
+ )
+ parser.add_argument(
+ "--max-attachments-per-issue",
+ type=int,
+ default=8,
+ help="Limit downloaded attachments per issue to avoid huge sync.",
+ )
+ parser.add_argument(
+ "--skip-asset-endpoints",
+ action="store_true",
+ help="Skip /issues/*/assets API calls and only parse URLs from issue/comment payloads.",
+ )
+ args = parser.parse_args()
+
+ owner, repo_name = args.repo.split("/", 1)
+ api_root = f"/api/v1/repos/{owner}/{repo_name}"
+ entries = _load_issues(
+ args.base_url,
+ api_root,
+ args.token,
+ args.state,
+ fetch_asset_endpoints=not args.skip_asset_endpoints,
+ )
+
+ open_entries = [issue for issue in entries if issue.state == "open"]
+ closed_entries = [issue for issue in entries if issue.state == "closed"]
+ issues_with_attachments = [issue for issue in entries if issue.attachments]
+ open_with_branch = [issue for issue in open_entries if issue.target_branch]
+ open_missing_branch = [issue for issue in open_entries if not issue.target_branch]
+ low_quality_open = [issue for issue in open_entries if issue.quality_score < args.min_quality_score]
+ duplicate_groups = _build_duplicate_groups(open_entries, args.dedupe_threshold)
+ closed_open_links = _build_closed_open_links(
+ closed_entries,
+ open_entries,
+ args.reopen_similarity_threshold,
+ args.reopen_title_overlap,
+ )
+
+ unresolved_closed: list[dict[str, Any]] = []
+
+ for issue in closed_entries:
+ if _contains_unresolved_feedback(issue.comments):
+ unresolved_closed.append(
+ {
+ "number": issue.number,
+ "title": issue.title,
+ "reason": "comment_feedback",
+ }
+ )
+
+ unresolved_closed = sorted(unresolved_closed, key=lambda item: item["number"])
+
+ if args.post_quality_feedback:
+ for issue in low_quality_open:
+ if not _needs_quality_feedback(issue, args.min_quality_score):
+ continue
+ _request_json(
+ args.base_url,
+ args.token,
+ f"{api_root}/issues/{issue.number}/comments",
+ method="POST",
+ body={"body": _quality_feedback_message()},
+ )
+
+ output_dir = Path(args.output_dir)
+ output_dir.mkdir(parents=True, exist_ok=True)
+ downloaded_attachments: list[dict[str, Any]] = []
+ if args.download_attachments:
+ downloaded_attachments = _download_attachments(
+ entries=entries,
+ token=args.token,
+ output_dir=output_dir / "attachments",
+ max_per_issue=max(1, args.max_attachments_per_issue),
+ )
+
+ payload = {
+ "summary": {
+ "total": len(entries),
+ "open": len(open_entries),
+ "closed": len(closed_entries),
+ "closed_but_unresolved": len(unresolved_closed),
+ "open_low_quality": len(low_quality_open),
+ "duplicate_groups": len(duplicate_groups),
+ "closed_open_reopen_candidates": len(closed_open_links),
+ "issues_with_attachments": len(issues_with_attachments),
+ "attachment_urls_detected": sum(len(issue.attachments) for issue in issues_with_attachments),
+ "open_with_branch_hint": len(open_with_branch),
+ "open_missing_branch_hint": len(open_missing_branch),
+ "attachments_downloaded": sum(
+ 1 for item in downloaded_attachments if item.get("status") == "ok"
+ ),
+ "attachments_download_failed": sum(
+ 1 for item in downloaded_attachments if item.get("status") != "ok"
+ ),
+ },
+ "unresolved_closed": unresolved_closed,
+ "closed_open_links": closed_open_links,
+ "open_low_quality": [item.brief() for item in low_quality_open],
+ "open_missing_branch_issues": [item.brief() for item in open_missing_branch],
+ "duplicate_groups": duplicate_groups,
+ "attachments_manifest": downloaded_attachments,
+ "issues": [item.brief() for item in entries],
+ }
+ (output_dir / "issue_audit.json").write_text(
+ json.dumps(payload, ensure_ascii=False, indent=2),
+ encoding="utf-8",
+ )
+ _render_report(
+ output_dir / "issue_audit_report.md",
+ unresolved_closed=unresolved_closed,
+ low_quality_open=low_quality_open,
+ duplicate_groups=duplicate_groups,
+ closed_open_links=closed_open_links,
+ downloaded_attachments=downloaded_attachments,
+ all_entries=entries,
+ )
+ print(json.dumps(payload["summary"], ensure_ascii=False))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py b/skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py
new file mode 100644
index 0000000..0ec97b3
--- /dev/null
+++ b/skills/gitea-issue-devops-agent/scripts/preview_slot_allocator.py
@@ -0,0 +1,248 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from typing import Any
+
+
+def utc_now() -> datetime:
+ return datetime.now(timezone.utc)
+
+
+def to_iso(value: datetime) -> str:
+ return value.astimezone(timezone.utc).replace(microsecond=0).isoformat()
+
+
+def parse_iso(value: str | None) -> datetime | None:
+ raw = (value or "").strip()
+ if not raw:
+ return None
+ try:
+ return datetime.fromisoformat(raw.replace("Z", "+00:00")).astimezone(timezone.utc)
+ except ValueError:
+ return None
+
+
+def load_state(path: Path) -> dict[str, Any]:
+ if not path.exists():
+ return {"version": 1, "updated_at": to_iso(utc_now()), "allocations": []}
+ try:
+ payload = json.loads(path.read_text(encoding="utf-8"))
+ except json.JSONDecodeError:
+ return {"version": 1, "updated_at": to_iso(utc_now()), "allocations": []}
+ if not isinstance(payload, dict):
+ return {"version": 1, "updated_at": to_iso(utc_now()), "allocations": []}
+ payload.setdefault("version", 1)
+ payload.setdefault("updated_at", to_iso(utc_now()))
+ allocations = payload.get("allocations")
+ if not isinstance(allocations, list):
+ payload["allocations"] = []
+ return payload
+
+
+def save_state(path: Path, payload: dict[str, Any]) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ payload["updated_at"] = to_iso(utc_now())
+ path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+
+
+def parse_slots(raw: str) -> list[str]:
+ slots = [item.strip() for item in raw.split(",") if item.strip()]
+ if not slots:
+ raise ValueError("at least one slot is required")
+ return slots
+
+
+def render_url(slot: str, template: str | None) -> str | None:
+ if not template:
+ return None
+ return template.replace("{slot}", slot)
+
+
+def prune_expired(allocations: list[dict[str, Any]], now: datetime) -> tuple[list[dict[str, Any]], int]:
+ kept: list[dict[str, Any]] = []
+ removed = 0
+ for item in allocations:
+ expires = parse_iso(str(item.get("expires_at") or ""))
+ if expires and expires < now:
+ removed += 1
+ continue
+ kept.append(item)
+ return kept, removed
+
+
+def allocation_sort_key(item: dict[str, Any]) -> datetime:
+ last_seen = parse_iso(str(item.get("last_seen_at") or ""))
+ allocated = parse_iso(str(item.get("allocated_at") or ""))
+ return last_seen or allocated or datetime.fromtimestamp(0, tz=timezone.utc)
+
+
+def output(payload: dict[str, Any]) -> None:
+ print(json.dumps(payload, ensure_ascii=False, indent=2))
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(
+ description="Allocate/reuse/release branch preview slots for issue-driven workflows."
+ )
+ parser.add_argument("--state-file", default=".tmp/preview-slots.json")
+ parser.add_argument("--slots", required=True, help="Comma-separated slot names, e.g. preview-a,preview-b")
+ parser.add_argument("--repo", default="", help="owner/repo")
+ parser.add_argument("--issue", type=int, default=None, help="Issue number")
+ parser.add_argument("--branch", default="", help="Branch name")
+ parser.add_argument("--slot", default="", help="Slot name (optional release filter)")
+ parser.add_argument("--ttl-hours", type=int, default=24, help="Allocation TTL in hours")
+ parser.add_argument("--url-template", default="", help="Optional URL template, use {slot} placeholder")
+ parser.add_argument("--release", action="store_true", help="Release matching allocation(s)")
+ parser.add_argument("--list", action="store_true", help="List active allocations and free slots")
+ parser.add_argument("--evict-oldest", action="store_true", help="Evict oldest allocation when slots are full")
+ args = parser.parse_args()
+
+ state_path = Path(args.state_file)
+ slots = parse_slots(args.slots)
+ state = load_state(state_path)
+
+ now = utc_now()
+ allocations_raw = [item for item in state.get("allocations", []) if isinstance(item, dict)]
+ allocations, pruned_count = prune_expired(allocations_raw, now)
+ state["allocations"] = allocations
+
+ if args.list:
+ used_slots = {str(item.get("slot") or "") for item in allocations}
+ free_slots = [slot for slot in slots if slot not in used_slots]
+ output(
+ {
+ "action": "list",
+ "state_file": str(state_path.as_posix()),
+ "pruned_expired": pruned_count,
+ "total_active": len(allocations),
+ "active_allocations": allocations,
+ "free_slots": free_slots,
+ }
+ )
+ save_state(state_path, state)
+ return
+
+ if args.release:
+ target_repo = args.repo.strip()
+ target_branch = args.branch.strip()
+ target_slot = args.slot.strip()
+ target_issue = args.issue
+
+ kept: list[dict[str, Any]] = []
+ released: list[dict[str, Any]] = []
+ for item in allocations:
+ match = True
+ if target_repo:
+ match = match and str(item.get("repo") or "") == target_repo
+ if target_branch:
+ match = match and str(item.get("branch") or "") == target_branch
+ if target_slot:
+ match = match and str(item.get("slot") or "") == target_slot
+ if target_issue is not None:
+ match = match and int(item.get("issue") or -1) == target_issue
+
+ if match:
+ released.append(item)
+ else:
+ kept.append(item)
+
+ state["allocations"] = kept
+ save_state(state_path, state)
+ output(
+ {
+ "action": "release",
+ "state_file": str(state_path.as_posix()),
+ "released_count": len(released),
+ "released": released,
+ "remaining_count": len(kept),
+ }
+ )
+ return
+
+ repo = args.repo.strip()
+ branch = args.branch.strip()
+ if not repo or not branch:
+ print("allocate requires --repo and --branch", file=sys.stderr)
+ sys.exit(2)
+
+ # Reuse existing allocation for same repo+branch.
+ existing = next(
+ (item for item in allocations if str(item.get("repo") or "") == repo and str(item.get("branch") or "") == branch),
+ None,
+ )
+ if existing is not None:
+ existing["issue"] = args.issue if args.issue is not None else existing.get("issue")
+ existing["last_seen_at"] = to_iso(now)
+ existing["expires_at"] = to_iso(now + timedelta(hours=max(1, args.ttl_hours)))
+ if args.url_template:
+ existing["url"] = render_url(str(existing.get("slot") or ""), args.url_template)
+ save_state(state_path, state)
+ output(
+ {
+ "action": "allocate",
+ "reused": True,
+ "state_file": str(state_path.as_posix()),
+ "allocation": existing,
+ "pruned_expired": pruned_count,
+ }
+ )
+ return
+
+ used_slots = {str(item.get("slot") or "") for item in allocations}
+ free_slots = [slot for slot in slots if slot not in used_slots]
+ evicted: dict[str, Any] | None = None
+
+ if not free_slots:
+ if not args.evict_oldest:
+ output(
+ {
+ "action": "allocate",
+ "reused": False,
+ "allocated": False,
+ "reason": "no_free_slots",
+ "state_file": str(state_path.as_posix()),
+ "active_allocations": allocations,
+ }
+ )
+ sys.exit(3)
+ oldest = sorted(allocations, key=allocation_sort_key)[0]
+ evicted = oldest
+ allocations.remove(oldest)
+ free_slots = [str(oldest.get("slot") or "")]
+
+ slot = free_slots[0]
+ allocation = {
+ "repo": repo,
+ "issue": args.issue,
+ "branch": branch,
+ "slot": slot,
+ "env_id": slot,
+ "url": render_url(slot, args.url_template),
+ "allocated_at": to_iso(now),
+ "last_seen_at": to_iso(now),
+ "expires_at": to_iso(now + timedelta(hours=max(1, args.ttl_hours))),
+ "status": "active",
+ }
+ allocations.append(allocation)
+ state["allocations"] = allocations
+ save_state(state_path, state)
+ output(
+ {
+ "action": "allocate",
+ "reused": False,
+ "allocated": True,
+ "state_file": str(state_path.as_posix()),
+ "allocation": allocation,
+ "evicted": evicted,
+ "pruned_expired": pruned_count,
+ }
+ )
+
+
+if __name__ == "__main__":
+ main()