feat: publish gitea issue devops skill with docs and workflow templates

This commit is contained in:
2026-03-06 22:15:53 +08:00
parent a664b902c4
commit ceb3557dde
10 changed files with 2188 additions and 2 deletions

View File

@@ -0,0 +1,234 @@
---
name: gitea-issue-devops-agent
description: End-to-end Gitea issue delivery workflow with guided onboarding, branch-scoped preview environments, and resource-aware deployment decisions. Use when tasks involve connecting to Gitea, processing text/image issues, fixing code on issue-specified branches, allocating/reusing test environments per branch, running test submission loops, coordinating review approvals, and closing issues only after verified delivery and engineer-confirmed merge.
---
# Gitea Issue DevOps Agent
## Mandatory Guided Start
Run this interaction before any coding or issue action:
1. Ask for repository address:
- preferred: full URL `https://<host>/<owner>/<repo>`
- fallback: `base_url` + `owner/repo`
2. Ask for API key/token with issue read/write permissions.
3. Ask user to select mode:
- `automatic`
- `semi-automatic`
- `manual` (non-automatic)
4. Ask optional defaults:
- designated reviewers (for semi-automatic mode)
- branch test submission entrypoint (CI command/job)
- environment policy:
- stable main URL (`main` fixed test env)
- optional shared QA URL
- preview slot pool (for issue branches), e.g. `preview-a,preview-b`
- preview URL template, e.g. `https://{slot}.qa.example.com`
- deployment environment + health endpoint
- minimum issue quality score (default `70`)
5. Validate connectivity by running:
- `python scripts/issue_audit.py --repo <owner/repo> --base-url <gitea_url> --token <token> --state all --download-attachments --output-dir .tmp/issue-audit`
6. Initialize preview-slot state (if branch previews enabled):
- `python scripts/preview_slot_allocator.py --state-file .tmp/preview-slots.json --slots <slot_csv> --list`
7. Echo back the selected mode and all gate rules, then start work.
If repository or token is missing/invalid, stop and request correction. Never start development without a successful connectivity check.
## Mode Definitions
### 1) Automatic Mode
- Read issue-specified branch and work on that branch.
- Implement fix, run checks, push branch, allocate/reuse branch preview env, and trigger branch test submission automatically.
- Monitor test results and issue feedback, then iterate on the same branch until pass.
- Close issue only after evidence is complete.
- Merge is still blocked until an engineer explicitly confirms merge approval.
### 2) Semi-Automatic Mode
- Read issue-specified branch and work on that branch.
- Implement and push fix.
- Notify designated reviewer with change summary, risk, and test plan.
- Wait for explicit human review approval.
- After approval, allocate/reuse branch preview env, trigger branch test submission and continue loop.
- Close issue only after evidence is complete.
- Merge is still blocked until an engineer explicitly confirms merge approval.
### 3) Manual Mode (Non-Automatic)
Require explicit human confirmation before each major action:
- selecting issue
- confirming target branch
- applying code changes
- pushing commits
- triggering tests/deploy
- closing/reopening issue
- executing merge
No autonomous transition is allowed in manual mode.
## Branch-First Rules
- Treat issue-declared branch as the source of truth.
- Accept branch hints from issue fields/body/comments (example: `branch: feat/login-fix`).
- If branch is missing or ambiguous, ask user/reporter and pause that issue.
- Do not silently switch branches.
- Keep one active issue per branch unless user explicitly approves batching.
## Environment Model (Required)
Always avoid `main` and issue branches overwriting each other.
1. `main` fixed env (stable):
- one permanent URL for regression/baseline testing
2. optional shared QA env:
- integration testing across multiple completed branches
3. issue preview slot env (ephemeral pool):
- small fixed pool (`N` slots, e.g. 2)
- one active branch binds to one slot
- issue comments must include slot + URL + branch
- close/merge/TTL expiry releases slot
Never deploy different branches to the same fixed URL unless user explicitly approves override.
## Issue -> Branch -> Environment Binding
- Binding key: `<repo>#<issue>#<branch>`
- Environment selection:
- if branch already has assigned slot: reuse same slot
- else allocate free slot from pool
- if no free slot:
- in `automatic`: evict oldest expired/inactive slot if policy allows
- in `semi-automatic` / `manual`: request explicit confirmation before eviction
- Persist slot state in `.tmp/preview-slots.json` via `scripts/preview_slot_allocator.py`
## Resource-Aware Deployment Strategy (Required)
Before every branch test submission, detect change scope:
- `python scripts/change_scope.py --repo-path <local_repo> --base-ref <target_base> --head-ref <branch_or_sha>`
Use the scope result to minimize resource usage:
1. `skip` (docs/tests/chore-only):
- do not deploy
- post no-op verification evidence
2. `client_only`:
- build/deploy client only
- reuse existing shared/stable server
- do not start a dedicated server for this branch
3. `server_only`:
- deploy/restart server only
- keep existing client if unchanged
4. `full_stack`:
- deploy both client and server
5. `infra_only`:
- apply infra/workflow changes; restart only required components
Hard rule:
- If server-related scope is unchanged, do not provision/restart dedicated server processes for that issue branch.
## Standard Workflow (All Modes)
### 1) Intake and Prioritization
- Pull issues, comments, and attachments from Gitea API.
- If issue text/comments indicate image evidence but `attachments_downloaded` is `0`, stop and report image-intake failure before coding.
- Prioritize in this order:
- `closed_but_unresolved`
- `open` + `quality_score >= min_quality_score`
- `open` + `quality_score < min_quality_score` (request details first)
- `closed_open_reopen_candidates`
- For issues with images, inspect attachments before coding.
### 2) Deduplication and Quality Gate
- Group issues by semantic intent, not literal wording.
- Keep one parent issue for implementation.
- Use `references/triage-standard.md` for score and comment templates.
- For low-quality issues, request details and mark as `needs-info`.
### 3) Fix Execution
- Prefer small, reversible patches.
- Link every code change to issue ID in commit or PR/MR notes.
- Split cross-cutting work into incremental commits.
### 4) Verification Gate
- Required:
- build/compile passes
- affected unit/integration tests pass
- smoke path for reported scenario passes
- For UI/image issues:
- compare before/after screenshots
- verify in at least one Chromium browser
### 5) Branch Test Submission ("提测")
- Submit testing on the issue branch (CI pipeline + branch preview env).
- Allocate/reuse branch slot before submission.
- Apply resource-aware deployment decision from change scope.
- Post evidence in issue comment:
- commit SHA
- test run URL and result
- environment/slot/URL
- deployment scope (`skip`/`client_only`/`server_only`/`full_stack`/`infra_only`)
- shared backend reused or dedicated backend started
- verification steps
- If fail/reject, iterate on same branch and re-submit.
### 6) Loop Control
- Continue `fix -> test submission -> feedback -> fix` until done.
- Reopen immediately if verification fails or regression appears.
- Do not close based on title-only or assumption-only validation.
### 7) Closure Rule
Close issue only when all are true:
- root cause identified
- fix verified with reproducible evidence
- test submission passed
- closure comment includes commit/test/deploy evidence
### 8) Merge Rule (Always Human-Confirmed)
- Final merge must be approved by an engineer in all modes.
- Agent can prepare merge notes/checklist, but must wait for explicit merge confirmation.
- Merge only after confirmation, then post final release evidence.
### 9) Environment Cleanup
- On issue close/merge:
- release preview slot
- stop branch-only processes (if any)
- keep main/shared env untouched
- On TTL expiry:
- reclaim idle slot automatically (automatic mode) or after confirmation (semi/manual)
## Script Usage
- `scripts/issue_audit.py`: collect issues/comments/attachments, detect duplicates, score quality, detect unresolved/closed-open links, extract issue branch hints, and generate reports.
- image intake uses three sources: markdown/html links, payload `assets/attachments` fields, and `/issues/*/assets` API endpoints.
- if your Gitea blocks the assets endpoints, pass `--skip-asset-endpoints` and rely on payload extraction.
- `scripts/preview_slot_allocator.py`: allocate/reuse/release/list preview slots by issue+branch.
- allocate example:
- `python scripts/preview_slot_allocator.py --state-file .tmp/preview-slots.json --slots preview-a,preview-b --repo <owner/repo> --issue 48 --branch dev --ttl-hours 24 --url-template https://{slot}.qa.example.com`
- release example:
- `python scripts/preview_slot_allocator.py --state-file .tmp/preview-slots.json --slots preview-a,preview-b --release --repo <owner/repo> --issue 48 --branch dev`
- `scripts/change_scope.py`: detect changed scope and recommend minimum deploy strategy.
- `python scripts/change_scope.py --repo-path <repo> --base-ref origin/main --head-ref HEAD`
- `references/triage-standard.md`: scoring rubric and templates for needs-info, review request, test submission, and merge approval.
## Operational Constraints
- Never bulk-close issues without per-issue verification evidence.
- Never ignore attachment images for UI/interaction issues.
- Never merge feature requests and bugfixes into one untraceable commit.
- Never bypass engineer merge confirmation.
- Never allow branch previews to overwrite main stable env.
- Never start dedicated branch server when scope indicates client-only changes.

View File

@@ -0,0 +1,4 @@
interface:
display_name: "Gitea Issue DevOps Agent"
short_description: "Guided Gitea issue delivery with execution modes, branch preview slots, and resource-aware deployments"
default_prompt: "Start with guided setup (repo URL, API key, mode, env policy), process issues on issue-specified branches, bind each branch to preview slots, decide deploy scope from git diff (skip/client-only/server-only/full-stack), avoid dedicated server restarts when backend is unchanged, run fix/test loops, and require engineer confirmation before final merge."

View File

@@ -0,0 +1,140 @@
# Triage Standard
## Quality Score (0-100)
- `+20` Expected vs actual is explicit.
- `+20` Reproduction steps are explicit.
- `+15` Environment is explicit (browser/device/version).
- `+15` Attachment exists (image/video/log).
- `+10` Title is specific enough to infer scope.
- `+20` Body has enough detail for engineering action.
`pass` = score `>= 70`.
## Status Decision
- `closed_but_unresolved`: issue is closed but has clear reporter feedback indicating “still not fixed”.
- `closed_open_reopen_candidate`: issue is closed, but a newer open issue has high title/body similarity and should be manually reviewed for mis-close or regression.
- `ready_for_fix`: open issue, quality pass, reproducible.
- `needs-info`: open issue, quality below threshold.
- `duplicate`: semantically same as another issue.
- `enhancement-epic`: feature scope too large for a single fix cycle; split into sub-issues first.
## Mandatory Review Gates
- `image-first`: when issue body/comments contain screenshots, review the image evidence before coding.
- `image-retrieval-proof`: include `attachment_urls_detected` and `attachments_downloaded` from audit summary to prove image intake worked.
- `real-ai-only`: for AI conversation/generation issues, verify against real provider APIs with valid keys. Do not close based on mock-only behavior.
- `closure-evidence`: close only with commit, test result, deploy proof, and verification path.
## Needs-Info Comment Template
Use this comment when score is below threshold:
```text
[issue-quality-feedback-v1]
当前工单暂不满足开发准入标准,已进入待补充队列。请补充以下信息后我们将立即纳入修复流程:
1) 复现步骤(至少 3 步)
2) 期望结果 vs 实际结果
3) 环境信息(浏览器/系统/时间)
4) 截图或录屏(建议标注异常区域)
```
## Guided Start Template
Use this at session start before any implementation:
```text
[devops-startup-check-v1]
开始处理前请提供:
1) 仓库地址(完整 URL 或 base_url + owner/repo
2) API Key具备 issue 读写权限)
3) 执行模式(三选一):
- automatic自动修复+提测循环,最终合并仍需工程师确认
- semi-automatic修复后先人工 review再提测循环
- manual全流程人工确认
可选:指定 reviewer、提测命令、部署环境、健康检查地址。
可选(推荐):主环境 URL、共享 QA URL、预览槽位池如 preview-a/preview-b和预览 URL 模板。
```
## Review Request Template (Semi-Automatic)
```text
[issue-review-request-v1]
已完成本轮修复,等待指定 reviewer 确认后进入提测:
- issue: #<number>
- branch: <branch>
- commit: <sha>
- change summary: <summary>
- risk: <risk notes>
- test plan: <plan>
请回复“review-approved”或给出修改意见。
```
## Test Submission Template
```text
[issue-test-submit-v1]
已按分支提测:
- issue: #<number>
- branch: <branch>
- commit: <sha>
- pipeline/test run: <url>
- environment: <env/version>
- preview slot: <slot>
- preview url: <url>
- deploy scope: <skip|client_only|server_only|full_stack|infra_only>
- server strategy: <reused-shared|dedicated-branch-server>
- verify steps: <steps>
如失败或结果不符合预期,将继续同分支迭代修复。
```
## Preview Slot Allocation Template
```text
[issue-preview-slot-v1]
已分配提测环境(按 issue+branch 绑定):
- issue: #<number>
- branch: <branch>
- slot: <preview-a|preview-b|...>
- preview url: <url>
- ttl: <hours>
说明:同一分支会复用该 slot关闭/合并后自动释放。
```
## Preview Slot Release Template
```text
[issue-preview-release-v1]
已释放提测环境:
- issue: #<number>
- branch: <branch>
- slot: <slot>
- reason: <merged|closed|ttl-expired|manual-release>
```
## Merge Approval Template
```text
[merge-approval-check-v1]
准备执行最终合并,请工程师确认:
- issue: #<number>
- branch: <branch>
- target: <target branch>
- review status: <approved/pending>
- test status: <passed/failed>
- release evidence: <links>
请明确回复“merge-approved”后再执行合并。
```
## Closure Comment Template
```text
[issue-verified-close-v1]
已修复并发布。
- commit: <sha>
- tests: <summary>
- deploy: <pipeline/run url>
- verify: <how verified>
如仍可复现,请附最新截图和复现步骤,我们将立即 reopen。
```

View File

@@ -0,0 +1,184 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
DOC_EXTS = {".md", ".rst", ".txt", ".adoc"}
TEST_MARKERS = ("/tests/", "/test/", "__tests__", ".spec.", ".test.")
CLIENT_PREFIXES = ("client/", "frontend/", "web/", "ui/")
SERVER_PREFIXES = ("server/", "backend/", "api/", "services/", "worker/")
INFRA_PREFIXES = (
"deploy/",
".gitea/workflows/",
".github/workflows/",
"infra/",
"k8s/",
"helm/",
"nginx/",
)
SHARED_RUNTIME_FILES = {
"package.json",
"package-lock.json",
"pnpm-lock.yaml",
"yarn.lock",
"requirements.txt",
"pyproject.toml",
"poetry.lock",
"Dockerfile",
"docker-compose.yml",
"docker-compose.yaml",
}
def run_git(cwd: Path, *args: str) -> str:
result = subprocess.run(
["git", *args],
cwd=str(cwd),
check=False,
capture_output=True,
text=True,
encoding="utf-8",
)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or f"git {' '.join(args)} failed")
return result.stdout.strip()
def normalize_path(raw: str) -> str:
return raw.replace("\\", "/").lstrip("./")
def classify_file(path: str) -> set[str]:
categories: set[str] = set()
normalized = normalize_path(path)
lower = normalized.lower()
suffix = Path(lower).suffix
if lower.startswith("docs/") or suffix in DOC_EXTS:
categories.add("docs")
if any(marker in lower for marker in TEST_MARKERS):
categories.add("tests")
if any(lower.startswith(prefix) for prefix in CLIENT_PREFIXES):
categories.add("client")
if any(lower.startswith(prefix) for prefix in SERVER_PREFIXES):
categories.add("server")
if any(lower.startswith(prefix) for prefix in INFRA_PREFIXES):
categories.add("infra")
if Path(lower).name in SHARED_RUNTIME_FILES:
categories.add("shared_runtime")
if not categories:
categories.add("unknown")
return categories
def main() -> None:
parser = argparse.ArgumentParser(
description="Detect branch change scope and recommend minimum deployment strategy."
)
parser.add_argument("--repo-path", default=".", help="Local git repository path")
parser.add_argument("--base-ref", required=True, help="Base ref, e.g. origin/main")
parser.add_argument("--head-ref", default="HEAD", help="Head ref, e.g. feature branch or SHA")
parser.add_argument("--no-files", action="store_true", help="Do not include changed file list in output")
args = parser.parse_args()
repo_path = Path(args.repo_path).resolve()
try:
merge_base = run_git(repo_path, "merge-base", args.base_ref, args.head_ref)
diff_output = run_git(repo_path, "diff", "--name-only", f"{merge_base}..{args.head_ref}")
except Exception as error: # noqa: BLE001
print(json.dumps({"error": str(error)}, ensure_ascii=False, indent=2))
sys.exit(2)
changed_files = [normalize_path(line) for line in diff_output.splitlines() if line.strip()]
file_classes: list[dict[str, Any]] = []
category_union: set[str] = set()
for path in changed_files:
categories = classify_file(path)
category_union.update(categories)
file_classes.append({"path": path, "categories": sorted(categories)})
has_docs = "docs" in category_union
has_tests = "tests" in category_union
has_client = "client" in category_union
has_server = "server" in category_union
has_infra = "infra" in category_union
has_shared_runtime = "shared_runtime" in category_union
has_unknown = "unknown" in category_union
only_docs_tests = bool(changed_files) and category_union.issubset({"docs", "tests"})
scope = "skip"
reasons: list[str] = []
if not changed_files:
scope = "skip"
reasons.append("no changed files")
elif only_docs_tests:
scope = "skip"
reasons.append("docs/tests-only changes")
else:
require_server = has_server or has_shared_runtime
require_client = has_client or has_shared_runtime
if has_unknown:
scope = "full_stack"
reasons.append("unknown file changes detected -> conservative full-stack deploy")
require_server = True
require_client = True
elif has_infra and not require_server and not require_client:
scope = "infra_only"
reasons.append("infra/workflow-only changes")
elif require_client and not require_server:
scope = "client_only"
reasons.append("client-only changes")
elif require_server and not require_client:
scope = "server_only"
reasons.append("server-only changes")
else:
scope = "full_stack"
reasons.append("client/server/shared runtime changes")
should_restart_server = scope in {"server_only", "full_stack"}
should_restart_client = scope in {"client_only", "full_stack"}
payload: dict[str, Any] = {
"repo_path": str(repo_path.as_posix()),
"base_ref": args.base_ref,
"head_ref": args.head_ref,
"merge_base": merge_base,
"changed_files_count": len(changed_files),
"scope": scope,
"categories": {
"docs": has_docs,
"tests": has_tests,
"client": has_client,
"server": has_server,
"infra": has_infra,
"shared_runtime": has_shared_runtime,
"unknown": has_unknown,
},
"only_docs_tests": only_docs_tests,
"actions": {
"skip_deploy": scope == "skip",
"should_restart_client": should_restart_client,
"should_restart_server": should_restart_server,
"reuse_shared_server": not should_restart_server,
"requires_dedicated_server": should_restart_server,
},
"reasons": reasons,
}
if not args.no_files:
payload["changed_files"] = changed_files
payload["file_classification"] = file_classes
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,873 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import html
import hashlib
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urljoin, urlparse
from urllib.request import Request, urlopen
IMG_MD_RE = re.compile(
r"!\[[^\]]*\]\(\s*<?([^\s>)]+)(?:\s+[\"'][^\"']*[\"'])?\s*\)"
)
IMG_HTML_RE = re.compile(r"<img[^>]+src=[\"']([^\"']+)[\"']", re.IGNORECASE)
IMG_URL_RE = re.compile(r"(https?://[^\s)]+?\.(?:png|jpg|jpeg|gif|webp|svg))", re.IGNORECASE)
ATTACHMENT_PATH_RE = re.compile(
r"((?:https?://[^\s)\"'>]+)?/(?:attachments|repo-attachments|api/v1/repos/[^\s)\"'>]+/issues(?:/comments)?/\d+/assets/\d+)[^\s)\"'>]*)",
re.IGNORECASE,
)
UNRESOLVED_KEYWORDS = (
"未修复",
"没有修复",
"问题还在",
"依旧",
"仍然",
"还是",
"无法",
"没解决",
"still not fixed",
"not fixed",
"cannot reproduce? no",
"failed",
"broken",
)
QUALITY_MARKER = "[issue-quality-feedback-v1]"
BRANCH_LABEL_RE = re.compile(
r"(?:^|[\r\n])\s*(?:branch|target branch|working branch|fix branch|分支|目标分支)\s*[:=]\s*`?([A-Za-z0-9._/\-]+)`?",
re.IGNORECASE,
)
BRANCH_INLINE_RE = re.compile(
r"(?:^|[\s,;])(?:/branch|branch)\s+`?([A-Za-z0-9._/\-]+)`?",
re.IGNORECASE,
)
BRANCH_ALLOWED_RE = re.compile(r"^[A-Za-z0-9._/\-]+$")
@dataclass
class IssueEntry:
number: int
state: str
title: str
body: str
created_at: str
updated_at: str
closed_at: str | None
comments: list[dict[str, Any]]
attachments: list[str]
quality_score: int
target_branch: str | None
def brief(self) -> dict[str, Any]:
return {
"number": self.number,
"state": self.state,
"title": self.title,
"quality_score": self.quality_score,
"target_branch": self.target_branch,
"attachments": len(self.attachments),
"created_at": self.created_at,
"updated_at": self.updated_at,
"closed_at": self.closed_at,
}
def _to_datetime(value: str | None) -> datetime | None:
raw = (value or "").strip()
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
except ValueError:
return None
def _request_json(
base_url: str,
token: str,
path: str,
query: dict[str, Any] | None = None,
method: str = "GET",
body: dict[str, Any] | None = None,
) -> Any:
query_str = f"?{urlencode(query)}" if query else ""
url = f"{base_url.rstrip('/')}{path}{query_str}"
payload = None if body is None else json.dumps(body).encode("utf-8")
req = Request(
url,
method=method,
headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
},
data=payload,
)
with urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else None
def _request_binary(url: str, token: str) -> tuple[bytes, str | None]:
header_candidates = (
{"Authorization": f"token {token}"},
{"Authorization": f"Bearer {token}"},
{"X-Gitea-Token": token},
{"Authorization": f"token {token}", "X-Gitea-Token": token},
)
last_error: Exception | None = None
for auth_headers in header_candidates:
req = Request(
url,
method="GET",
headers={
"Accept": "*/*",
**auth_headers,
},
)
try:
with urlopen(req, timeout=30) as resp:
content = resp.read()
content_type = resp.headers.get("Content-Type")
return content, content_type
except HTTPError as error:
last_error = error
if error.code in {401, 403}:
continue
raise
except URLError as error:
last_error = error
continue
if last_error is not None:
raise last_error
raise RuntimeError("failed to download attachment")
def _normalize_url(raw_url: str, base_url: str) -> str | None:
candidate = html.unescape(str(raw_url or "").strip())
if not candidate:
return None
candidate = candidate.strip("<>\"'")
if not candidate:
return None
if candidate.startswith("//"):
base_scheme = urlparse(base_url).scheme or "https"
candidate = f"{base_scheme}:{candidate}"
if candidate.startswith("http://") or candidate.startswith("https://"):
return candidate
return urljoin(f"{base_url.rstrip('/')}/", candidate)
def _asset_to_urls(asset: dict[str, Any], base_url: str) -> list[str]:
urls: list[str] = []
for key in ("browser_download_url", "download_url", "url", "href", "link"):
normalized = _normalize_url(str(asset.get(key) or ""), base_url)
if normalized and normalized not in urls:
urls.append(normalized)
uuid_value = str(asset.get("uuid") or "").strip()
if uuid_value:
fallback = _normalize_url(f"/attachments/{uuid_value}", base_url)
if fallback and fallback not in urls:
urls.append(fallback)
return urls
def _extract_asset_urls(payload: dict[str, Any], base_url: str) -> list[str]:
results: list[str] = []
for key in ("assets", "attachments"):
assets = payload.get(key) or []
if not isinstance(assets, list):
continue
for asset in assets:
if not isinstance(asset, dict):
continue
for url in _asset_to_urls(asset, base_url):
if url not in results:
results.append(url)
return results
def _request_json_optional(
*,
base_url: str,
token: str,
path: str,
query: dict[str, Any] | None = None,
) -> Any | None:
try:
return _request_json(base_url, token, path, query=query)
except HTTPError as error:
if error.code in {401, 403, 404, 405}:
return None
raise
except URLError:
return None
def _list_asset_urls_from_endpoint(
*,
base_url: str,
token: str,
path: str,
) -> list[str]:
urls: list[str] = []
page = 1
while True:
payload = _request_json_optional(
base_url=base_url,
token=token,
path=path,
query={"limit": 50, "page": page},
)
if payload is None:
break
if not isinstance(payload, list) or not payload:
break
for asset in payload:
if not isinstance(asset, dict):
continue
for url in _asset_to_urls(asset, base_url):
if url not in urls:
urls.append(url)
if len(payload) < 50:
break
page += 1
return urls
def _list_issue_attachment_urls(
*,
base_url: str,
api_root: str,
token: str,
issue_number: int,
) -> list[str]:
return _list_asset_urls_from_endpoint(
base_url=base_url,
token=token,
path=f"{api_root}/issues/{issue_number}/assets",
)
def _list_comment_attachment_urls(
*,
base_url: str,
api_root: str,
token: str,
comment_id: int,
) -> list[str]:
return _list_asset_urls_from_endpoint(
base_url=base_url,
token=token,
path=f"{api_root}/issues/comments/{comment_id}/assets",
)
def _extract_attachments(text: str, base_url: str) -> list[str]:
if not text:
return []
urls = [
*IMG_MD_RE.findall(text),
*IMG_HTML_RE.findall(text),
*IMG_URL_RE.findall(text),
*ATTACHMENT_PATH_RE.findall(text),
]
normalized: list[str] = []
for url in urls:
cleaned = _normalize_url(str(url), base_url)
if cleaned:
normalized.append(cleaned)
return sorted(set(normalized))
def _normalize_branch_name(raw_value: str) -> str | None:
candidate = str(raw_value or "").strip().strip("`'\"")
candidate = re.sub(r"[),.;]+$", "", candidate)
if not candidate:
return None
if len(candidate) > 160:
return None
if not BRANCH_ALLOWED_RE.fullmatch(candidate):
return None
return candidate
def _extract_branch_hints(text: str) -> list[str]:
if not text:
return []
results: list[str] = []
for regex in (BRANCH_LABEL_RE, BRANCH_INLINE_RE):
for match in regex.findall(text):
branch = _normalize_branch_name(match)
if branch and branch not in results:
results.append(branch)
return results
def _pick_issue_branch(body: str, comments: list[dict[str, Any]]) -> str | None:
for branch in _extract_branch_hints(body):
return branch
for comment in reversed(comments):
for branch in _extract_branch_hints(str(comment.get("body") or "")):
return branch
return None
def _normalize_for_similarity(text: str) -> str:
lowered = text.lower()
lowered = re.sub(r"[`*_>#~=\[\](){}:;,.!?/\\|+-]+", " ", lowered)
lowered = re.sub(r"\s+", " ", lowered).strip()
return lowered
def _quality_score(issue: dict[str, Any], attachments: list[str], comments: list[dict[str, Any]]) -> int:
title = str(issue.get("title") or "")
body = str(issue.get("body") or "")
comment_blob = "\n".join(str(item.get("body") or "") for item in comments[:5])
text = f"{title}\n{body}\n{comment_blob}"
score = 0
if re.search(
r"(期望|expected).{0,24}(实际|actual)|(实际|actual).{0,24}(期望|expected)",
text,
re.I | re.S,
):
score += 20
if re.search(r"(复现|步骤|step|how to reproduce|重现)", text, re.I):
score += 20
if re.search(r"(浏览器|browser|系统|os|版本|version|设备|device|时间)", text, re.I):
score += 15
if attachments:
score += 15
if len(title.strip()) >= 6:
score += 10
if len(re.sub(r"\s+", "", body)) >= 40:
score += 20
return min(100, score)
def _contains_unresolved_feedback(comments: list[dict[str, Any]]) -> bool:
for comment in comments:
body = str(comment.get("body") or "").lower()
if any(keyword in body for keyword in UNRESOLVED_KEYWORDS):
return True
return False
def _issue_similarity(left: IssueEntry, right: IssueEntry) -> float:
lhs = _normalize_for_similarity(f"{left.title} {left.body[:700]}")
rhs = _normalize_for_similarity(f"{right.title} {right.body[:700]}")
if not lhs or not rhs:
return 0.0
return SequenceMatcher(None, lhs, rhs).ratio()
def _title_ngrams(title: str) -> set[str]:
normalized = re.sub(r"\s+", "", title.lower())
normalized = re.sub(r"[^a-z0-9\u4e00-\u9fff]", "", normalized)
grams: set[str] = set()
for size in (2, 3):
for idx in range(len(normalized) - size + 1):
gram = normalized[idx : idx + size]
if not gram or gram.isdigit():
continue
grams.add(gram)
return grams
def _build_duplicate_groups(entries: list[IssueEntry], threshold: float) -> list[list[int]]:
if not entries:
return []
pairs: list[tuple[int, int]] = []
numbers = [item.number for item in entries]
for i in range(len(entries)):
for j in range(i + 1, len(entries)):
ratio = _issue_similarity(entries[i], entries[j])
if ratio >= threshold:
pairs.append((entries[i].number, entries[j].number))
groups: list[list[int]] = []
seen: set[int] = set()
graph: dict[int, set[int]] = {}
for a, b in pairs:
graph.setdefault(a, set()).add(b)
graph.setdefault(b, set()).add(a)
for number in numbers:
if number in seen or number not in graph:
continue
stack = [number]
group: list[int] = []
while stack:
node = stack.pop()
if node in seen:
continue
seen.add(node)
group.append(node)
stack.extend(graph.get(node, set()))
if len(group) > 1:
groups.append(sorted(group))
return sorted(groups, key=lambda item: item[0])
def _build_closed_open_links(
closed_entries: list[IssueEntry],
open_entries: list[IssueEntry],
threshold: float,
min_title_ngram_overlap: int,
) -> list[dict[str, Any]]:
links: list[dict[str, Any]] = []
for closed_issue in closed_entries:
closed_at = _to_datetime(closed_issue.closed_at) or _to_datetime(closed_issue.updated_at)
if not closed_at:
continue
best_open: IssueEntry | None = None
best_ratio = 0.0
best_overlap = 0
closed_grams = _title_ngrams(closed_issue.title)
for open_issue in open_entries:
open_created = _to_datetime(open_issue.created_at)
if open_created and open_created < closed_at:
continue
ratio = _issue_similarity(closed_issue, open_issue)
overlap = len(closed_grams & _title_ngrams(open_issue.title))
if ratio > best_ratio or (ratio == best_ratio and overlap > best_overlap):
best_ratio = ratio
best_overlap = overlap
best_open = open_issue
if (
best_open
and best_ratio >= threshold
and best_overlap >= max(1, min_title_ngram_overlap)
):
links.append(
{
"closed_issue": closed_issue.number,
"open_issue": best_open.number,
"similarity": round(best_ratio, 4),
"title_ngram_overlap": best_overlap,
}
)
return sorted(links, key=lambda item: item["closed_issue"])
def _load_issues(
base_url: str,
api_root: str,
token: str,
state: str,
*,
fetch_asset_endpoints: bool,
) -> list[IssueEntry]:
states = ["open", "closed"] if state == "all" else [state]
collected: list[IssueEntry] = []
for target_state in states:
page = 1
while True:
issues = _request_json(
base_url,
token,
f"{api_root}/issues",
query={"state": target_state, "limit": 50, "page": page},
)
if not issues:
break
for issue in issues:
number = int(issue["number"])
body = str(issue.get("body") or "")
comments = _request_json(
base_url,
token,
f"{api_root}/issues/{number}/comments",
query={"limit": 100},
)
comments = comments or []
attachments = _extract_attachments(body, base_url)
attachments.extend(_extract_asset_urls(issue, base_url))
if fetch_asset_endpoints:
attachments.extend(
_list_issue_attachment_urls(
base_url=base_url,
api_root=api_root,
token=token,
issue_number=number,
)
)
for comment in comments:
attachments.extend(_extract_attachments(str(comment.get("body") or ""), base_url))
attachments.extend(_extract_asset_urls(comment, base_url))
comment_id_raw = comment.get("id")
comment_id = (
comment_id_raw
if isinstance(comment_id_raw, int)
else int(comment_id_raw)
if isinstance(comment_id_raw, str) and comment_id_raw.isdigit()
else None
)
if fetch_asset_endpoints and comment_id is not None:
attachments.extend(
_list_comment_attachment_urls(
base_url=base_url,
api_root=api_root,
token=token,
comment_id=comment_id,
)
)
attachments = sorted(set(attachments))
collected.append(
IssueEntry(
number=number,
state=str(issue.get("state") or target_state),
title=str(issue.get("title") or ""),
body=body,
created_at=str(issue.get("created_at") or ""),
updated_at=str(issue.get("updated_at") or ""),
closed_at=issue.get("closed_at"),
comments=comments,
attachments=attachments,
quality_score=_quality_score(issue, attachments, comments),
target_branch=_pick_issue_branch(body, comments),
)
)
if len(issues) < 50:
break
page += 1
return sorted(collected, key=lambda item: item.number)
def _needs_quality_feedback(issue: IssueEntry, min_score: int) -> bool:
if issue.state != "open" or issue.quality_score >= min_score:
return False
for comment in issue.comments:
if QUALITY_MARKER in str(comment.get("body") or ""):
return False
return True
def _quality_feedback_message() -> str:
return (
f"{QUALITY_MARKER}\n"
"当前工单暂不满足开发准入标准,已进入待补充队列。请补充以下信息后我们将立即纳入修复流程:\n"
"1) 复现步骤(至少 3 步)\n"
"2) 期望结果 vs 实际结果\n"
"3) 环境信息(浏览器/系统/时间)\n"
"4) 截图或录屏(建议标注异常区域)"
)
def _pick_ext_from_url_or_mime(url: str, content_type: str | None) -> str:
parsed = urlparse(url)
suffix = Path(parsed.path).suffix.lower().strip()
if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}:
return suffix
normalized = (content_type or "").lower()
if "png" in normalized:
return ".png"
if "jpeg" in normalized or "jpg" in normalized:
return ".jpg"
if "gif" in normalized:
return ".gif"
if "webp" in normalized:
return ".webp"
if "svg" in normalized:
return ".svg"
return ".bin"
def _download_attachments(
*,
entries: list[IssueEntry],
token: str,
output_dir: Path,
max_per_issue: int,
) -> list[dict[str, Any]]:
output_dir.mkdir(parents=True, exist_ok=True)
downloaded: list[dict[str, Any]] = []
seen_hashes: set[str] = set()
for issue in entries:
if not issue.attachments:
continue
for idx, url in enumerate(issue.attachments[:max_per_issue], start=1):
digest = hashlib.sha1(url.encode("utf-8")).hexdigest()
if digest in seen_hashes:
continue
seen_hashes.add(digest)
try:
blob, content_type = _request_binary(url, token)
ext = _pick_ext_from_url_or_mime(url, content_type)
file_name = f"issue-{issue.number}-{idx}-{digest[:8]}{ext}"
local_path = output_dir / file_name
local_path.write_bytes(blob)
downloaded.append(
{
"issue": issue.number,
"url": url,
"path": str(local_path.as_posix()),
"size_bytes": len(blob),
"content_type": content_type or "",
"status": "ok",
}
)
except Exception as error: # noqa: BLE001
downloaded.append(
{
"issue": issue.number,
"url": url,
"path": "",
"size_bytes": 0,
"content_type": "",
"status": "failed",
"error": str(error),
}
)
return downloaded
def _render_report(
output_path: Path,
*,
unresolved_closed: list[dict[str, Any]],
low_quality_open: list[IssueEntry],
duplicate_groups: list[list[int]],
closed_open_links: list[dict[str, Any]],
downloaded_attachments: list[dict[str, Any]],
all_entries: list[IssueEntry],
) -> None:
lines: list[str] = []
lines.append("# Issue Audit Report")
lines.append("")
lines.append(f"- total issues: {len(all_entries)}")
lines.append(f"- closed_but_unresolved: {len(unresolved_closed)}")
lines.append(f"- open_low_quality: {len(low_quality_open)}")
lines.append(f"- duplicate_groups: {len(duplicate_groups)}")
lines.append(f"- closed_open_reopen_candidates: {len(closed_open_links)}")
issues_with_attachments = [item for item in all_entries if item.attachments]
lines.append(f"- issues_with_attachments: {len(issues_with_attachments)}")
lines.append(
f"- attachment_urls_detected: {sum(len(item.attachments) for item in issues_with_attachments)}"
)
open_entries = [item for item in all_entries if item.state == "open"]
open_with_branch = [item for item in open_entries if item.target_branch]
lines.append(f"- open_with_branch_hint: {len(open_with_branch)}/{len(open_entries)}")
if downloaded_attachments:
ok_count = sum(1 for item in downloaded_attachments if item["status"] == "ok")
failed_count = sum(1 for item in downloaded_attachments if item["status"] != "ok")
lines.append(f"- attachments_downloaded: {ok_count}/{len(downloaded_attachments)}")
lines.append(f"- attachments_download_failed: {failed_count}")
lines.append("")
lines.append("## Closed But Unresolved")
if not unresolved_closed:
lines.append("- none")
else:
for item in unresolved_closed:
lines.append(
f"- #{item['number']} {item['title']} (reason={item['reason']}, related_open={item.get('related_open')}, similarity={item.get('similarity')})"
)
lines.append("")
lines.append("## Closed/Open Regression Candidates")
if not closed_open_links:
lines.append("- none")
else:
for item in closed_open_links:
lines.append(
f"- closed #{item['closed_issue']} -> open #{item['open_issue']} (similarity={item['similarity']}, title_overlap={item['title_ngram_overlap']})"
)
lines.append("")
lines.append("## Open Low Quality")
if not low_quality_open:
lines.append("- none")
else:
for issue in low_quality_open:
lines.append(
f"- #{issue.number} {issue.title} (score={issue.quality_score}, branch={issue.target_branch or 'missing'}, attachments={len(issue.attachments)})"
)
lines.append("")
lines.append("## Open Issue Branch Mapping")
if not open_entries:
lines.append("- none")
else:
for issue in open_entries:
lines.append(f"- #{issue.number} -> {issue.target_branch or 'missing'}")
lines.append("")
lines.append("## Duplicate Groups (Open)")
if not duplicate_groups:
lines.append("- none")
else:
for group in duplicate_groups:
lines.append(f"- {', '.join(f'#{num}' for num in group)}")
lines.append("")
if downloaded_attachments:
lines.append("## Attachment Download Manifest")
for item in downloaded_attachments:
lines.append(
f"- issue #{item['issue']}: {item['status']} -> {item['path'] or item['url']}"
)
lines.append("")
output_path.write_text("\n".join(lines), encoding="utf-8")
def main() -> None:
parser = argparse.ArgumentParser(description="Audit Gitea issues for delivery workflow.")
parser.add_argument("--base-url", required=True, help="Gitea host, e.g. https://fun-md.com")
parser.add_argument("--repo", required=True, help="owner/repo")
parser.add_argument("--token", required=True, help="Gitea API token")
parser.add_argument("--state", default="all", choices=["open", "closed", "all"])
parser.add_argument("--output-dir", default=".tmp/issue-audit")
parser.add_argument("--min-quality-score", type=int, default=70)
parser.add_argument("--dedupe-threshold", type=float, default=0.62)
parser.add_argument("--reopen-similarity-threshold", type=float, default=0.27)
parser.add_argument(
"--reopen-title-overlap",
type=int,
default=2,
help="Minimum 2/3-char title n-gram overlap for closed/open regression candidates.",
)
parser.add_argument(
"--post-quality-feedback",
action="store_true",
help="Post needs-info comment for low quality open issues.",
)
parser.add_argument(
"--download-attachments",
action="store_true",
help="Download image attachments to output-dir/attachments for manual visual review.",
)
parser.add_argument(
"--max-attachments-per-issue",
type=int,
default=8,
help="Limit downloaded attachments per issue to avoid huge sync.",
)
parser.add_argument(
"--skip-asset-endpoints",
action="store_true",
help="Skip /issues/*/assets API calls and only parse URLs from issue/comment payloads.",
)
args = parser.parse_args()
owner, repo_name = args.repo.split("/", 1)
api_root = f"/api/v1/repos/{owner}/{repo_name}"
entries = _load_issues(
args.base_url,
api_root,
args.token,
args.state,
fetch_asset_endpoints=not args.skip_asset_endpoints,
)
open_entries = [issue for issue in entries if issue.state == "open"]
closed_entries = [issue for issue in entries if issue.state == "closed"]
issues_with_attachments = [issue for issue in entries if issue.attachments]
open_with_branch = [issue for issue in open_entries if issue.target_branch]
open_missing_branch = [issue for issue in open_entries if not issue.target_branch]
low_quality_open = [issue for issue in open_entries if issue.quality_score < args.min_quality_score]
duplicate_groups = _build_duplicate_groups(open_entries, args.dedupe_threshold)
closed_open_links = _build_closed_open_links(
closed_entries,
open_entries,
args.reopen_similarity_threshold,
args.reopen_title_overlap,
)
unresolved_closed: list[dict[str, Any]] = []
for issue in closed_entries:
if _contains_unresolved_feedback(issue.comments):
unresolved_closed.append(
{
"number": issue.number,
"title": issue.title,
"reason": "comment_feedback",
}
)
unresolved_closed = sorted(unresolved_closed, key=lambda item: item["number"])
if args.post_quality_feedback:
for issue in low_quality_open:
if not _needs_quality_feedback(issue, args.min_quality_score):
continue
_request_json(
args.base_url,
args.token,
f"{api_root}/issues/{issue.number}/comments",
method="POST",
body={"body": _quality_feedback_message()},
)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
downloaded_attachments: list[dict[str, Any]] = []
if args.download_attachments:
downloaded_attachments = _download_attachments(
entries=entries,
token=args.token,
output_dir=output_dir / "attachments",
max_per_issue=max(1, args.max_attachments_per_issue),
)
payload = {
"summary": {
"total": len(entries),
"open": len(open_entries),
"closed": len(closed_entries),
"closed_but_unresolved": len(unresolved_closed),
"open_low_quality": len(low_quality_open),
"duplicate_groups": len(duplicate_groups),
"closed_open_reopen_candidates": len(closed_open_links),
"issues_with_attachments": len(issues_with_attachments),
"attachment_urls_detected": sum(len(issue.attachments) for issue in issues_with_attachments),
"open_with_branch_hint": len(open_with_branch),
"open_missing_branch_hint": len(open_missing_branch),
"attachments_downloaded": sum(
1 for item in downloaded_attachments if item.get("status") == "ok"
),
"attachments_download_failed": sum(
1 for item in downloaded_attachments if item.get("status") != "ok"
),
},
"unresolved_closed": unresolved_closed,
"closed_open_links": closed_open_links,
"open_low_quality": [item.brief() for item in low_quality_open],
"open_missing_branch_issues": [item.brief() for item in open_missing_branch],
"duplicate_groups": duplicate_groups,
"attachments_manifest": downloaded_attachments,
"issues": [item.brief() for item in entries],
}
(output_dir / "issue_audit.json").write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
_render_report(
output_dir / "issue_audit_report.md",
unresolved_closed=unresolved_closed,
low_quality_open=low_quality_open,
duplicate_groups=duplicate_groups,
closed_open_links=closed_open_links,
downloaded_attachments=downloaded_attachments,
all_entries=entries,
)
print(json.dumps(payload["summary"], ensure_ascii=False))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,248 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def to_iso(value: datetime) -> str:
return value.astimezone(timezone.utc).replace(microsecond=0).isoformat()
def parse_iso(value: str | None) -> datetime | None:
raw = (value or "").strip()
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00")).astimezone(timezone.utc)
except ValueError:
return None
def load_state(path: Path) -> dict[str, Any]:
if not path.exists():
return {"version": 1, "updated_at": to_iso(utc_now()), "allocations": []}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {"version": 1, "updated_at": to_iso(utc_now()), "allocations": []}
if not isinstance(payload, dict):
return {"version": 1, "updated_at": to_iso(utc_now()), "allocations": []}
payload.setdefault("version", 1)
payload.setdefault("updated_at", to_iso(utc_now()))
allocations = payload.get("allocations")
if not isinstance(allocations, list):
payload["allocations"] = []
return payload
def save_state(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload["updated_at"] = to_iso(utc_now())
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def parse_slots(raw: str) -> list[str]:
slots = [item.strip() for item in raw.split(",") if item.strip()]
if not slots:
raise ValueError("at least one slot is required")
return slots
def render_url(slot: str, template: str | None) -> str | None:
if not template:
return None
return template.replace("{slot}", slot)
def prune_expired(allocations: list[dict[str, Any]], now: datetime) -> tuple[list[dict[str, Any]], int]:
kept: list[dict[str, Any]] = []
removed = 0
for item in allocations:
expires = parse_iso(str(item.get("expires_at") or ""))
if expires and expires < now:
removed += 1
continue
kept.append(item)
return kept, removed
def allocation_sort_key(item: dict[str, Any]) -> datetime:
last_seen = parse_iso(str(item.get("last_seen_at") or ""))
allocated = parse_iso(str(item.get("allocated_at") or ""))
return last_seen or allocated or datetime.fromtimestamp(0, tz=timezone.utc)
def output(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False, indent=2))
def main() -> None:
parser = argparse.ArgumentParser(
description="Allocate/reuse/release branch preview slots for issue-driven workflows."
)
parser.add_argument("--state-file", default=".tmp/preview-slots.json")
parser.add_argument("--slots", required=True, help="Comma-separated slot names, e.g. preview-a,preview-b")
parser.add_argument("--repo", default="", help="owner/repo")
parser.add_argument("--issue", type=int, default=None, help="Issue number")
parser.add_argument("--branch", default="", help="Branch name")
parser.add_argument("--slot", default="", help="Slot name (optional release filter)")
parser.add_argument("--ttl-hours", type=int, default=24, help="Allocation TTL in hours")
parser.add_argument("--url-template", default="", help="Optional URL template, use {slot} placeholder")
parser.add_argument("--release", action="store_true", help="Release matching allocation(s)")
parser.add_argument("--list", action="store_true", help="List active allocations and free slots")
parser.add_argument("--evict-oldest", action="store_true", help="Evict oldest allocation when slots are full")
args = parser.parse_args()
state_path = Path(args.state_file)
slots = parse_slots(args.slots)
state = load_state(state_path)
now = utc_now()
allocations_raw = [item for item in state.get("allocations", []) if isinstance(item, dict)]
allocations, pruned_count = prune_expired(allocations_raw, now)
state["allocations"] = allocations
if args.list:
used_slots = {str(item.get("slot") or "") for item in allocations}
free_slots = [slot for slot in slots if slot not in used_slots]
output(
{
"action": "list",
"state_file": str(state_path.as_posix()),
"pruned_expired": pruned_count,
"total_active": len(allocations),
"active_allocations": allocations,
"free_slots": free_slots,
}
)
save_state(state_path, state)
return
if args.release:
target_repo = args.repo.strip()
target_branch = args.branch.strip()
target_slot = args.slot.strip()
target_issue = args.issue
kept: list[dict[str, Any]] = []
released: list[dict[str, Any]] = []
for item in allocations:
match = True
if target_repo:
match = match and str(item.get("repo") or "") == target_repo
if target_branch:
match = match and str(item.get("branch") or "") == target_branch
if target_slot:
match = match and str(item.get("slot") or "") == target_slot
if target_issue is not None:
match = match and int(item.get("issue") or -1) == target_issue
if match:
released.append(item)
else:
kept.append(item)
state["allocations"] = kept
save_state(state_path, state)
output(
{
"action": "release",
"state_file": str(state_path.as_posix()),
"released_count": len(released),
"released": released,
"remaining_count": len(kept),
}
)
return
repo = args.repo.strip()
branch = args.branch.strip()
if not repo or not branch:
print("allocate requires --repo and --branch", file=sys.stderr)
sys.exit(2)
# Reuse existing allocation for same repo+branch.
existing = next(
(item for item in allocations if str(item.get("repo") or "") == repo and str(item.get("branch") or "") == branch),
None,
)
if existing is not None:
existing["issue"] = args.issue if args.issue is not None else existing.get("issue")
existing["last_seen_at"] = to_iso(now)
existing["expires_at"] = to_iso(now + timedelta(hours=max(1, args.ttl_hours)))
if args.url_template:
existing["url"] = render_url(str(existing.get("slot") or ""), args.url_template)
save_state(state_path, state)
output(
{
"action": "allocate",
"reused": True,
"state_file": str(state_path.as_posix()),
"allocation": existing,
"pruned_expired": pruned_count,
}
)
return
used_slots = {str(item.get("slot") or "") for item in allocations}
free_slots = [slot for slot in slots if slot not in used_slots]
evicted: dict[str, Any] | None = None
if not free_slots:
if not args.evict_oldest:
output(
{
"action": "allocate",
"reused": False,
"allocated": False,
"reason": "no_free_slots",
"state_file": str(state_path.as_posix()),
"active_allocations": allocations,
}
)
sys.exit(3)
oldest = sorted(allocations, key=allocation_sort_key)[0]
evicted = oldest
allocations.remove(oldest)
free_slots = [str(oldest.get("slot") or "")]
slot = free_slots[0]
allocation = {
"repo": repo,
"issue": args.issue,
"branch": branch,
"slot": slot,
"env_id": slot,
"url": render_url(slot, args.url_template),
"allocated_at": to_iso(now),
"last_seen_at": to_iso(now),
"expires_at": to_iso(now + timedelta(hours=max(1, args.ttl_hours))),
"status": "active",
}
allocations.append(allocation)
state["allocations"] = allocations
save_state(state_path, state)
output(
{
"action": "allocate",
"reused": False,
"allocated": True,
"state_file": str(state_path.as_posix()),
"allocation": allocation,
"evicted": evicted,
"pruned_expired": pruned_count,
}
)
if __name__ == "__main__":
main()