#!/usr/bin/env python3 from __future__ import annotations import argparse import html import hashlib import json import re from dataclasses import dataclass from datetime import datetime, timezone from difflib import SequenceMatcher from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlencode, urljoin, urlparse from urllib.request import Request, urlopen IMG_MD_RE = re.compile( r"!\[[^\]]*\]\(\s*)]+)(?:\s+[\"'][^\"']*[\"'])?\s*\)" ) IMG_HTML_RE = re.compile(r"]+src=[\"']([^\"']+)[\"']", re.IGNORECASE) IMG_URL_RE = re.compile(r"(https?://[^\s)]+?\.(?:png|jpg|jpeg|gif|webp|svg))", re.IGNORECASE) ATTACHMENT_PATH_RE = re.compile( r"((?:https?://[^\s)\"'>]+)?/(?:attachments|repo-attachments|api/v1/repos/[^\s)\"'>]+/issues(?:/comments)?/\d+/assets/\d+)[^\s)\"'>]*)", re.IGNORECASE, ) UNRESOLVED_KEYWORDS = ( "未修复", "没有修复", "问题还在", "依旧", "仍然", "还是", "无法", "没解决", "still not fixed", "not fixed", "cannot reproduce? no", "failed", "broken", ) QUALITY_MARKER = "[issue-quality-feedback-v1]" BRANCH_LABEL_RE = re.compile( r"(?:^|[\r\n])\s*(?:branch|target branch|working branch|fix branch|分支|目标分支)\s*[::=]\s*`?([A-Za-z0-9._/\-]+)`?", re.IGNORECASE, ) BRANCH_INLINE_RE = re.compile( r"(?:^|[\s,;])(?:/branch|branch)\s+`?([A-Za-z0-9._/\-]+)`?", re.IGNORECASE, ) BRANCH_ALLOWED_RE = re.compile(r"^[A-Za-z0-9._/\-]+$") @dataclass class IssueEntry: number: int state: str title: str body: str created_at: str updated_at: str closed_at: str | None comments: list[dict[str, Any]] attachments: list[str] quality_score: int target_branch: str | None def brief(self) -> dict[str, Any]: return { "number": self.number, "state": self.state, "title": self.title, "quality_score": self.quality_score, "target_branch": self.target_branch, "attachments": len(self.attachments), "created_at": self.created_at, "updated_at": self.updated_at, "closed_at": self.closed_at, } def _to_datetime(value: str | None) -> datetime | None: raw = (value or "").strip() if not raw: return None try: return datetime.fromisoformat(raw.replace("Z", "+00:00")) except ValueError: return None def _request_json( base_url: str, token: str, path: str, query: dict[str, Any] | None = None, method: str = "GET", body: dict[str, Any] | None = None, ) -> Any: query_str = f"?{urlencode(query)}" if query else "" url = f"{base_url.rstrip('/')}{path}{query_str}" payload = None if body is None else json.dumps(body).encode("utf-8") req = Request( url, method=method, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", "Accept": "application/json", }, data=payload, ) with urlopen(req, timeout=30) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw else None def _request_binary(url: str, token: str) -> tuple[bytes, str | None]: header_candidates = ( {"Authorization": f"token {token}"}, {"Authorization": f"Bearer {token}"}, {"X-Gitea-Token": token}, {"Authorization": f"token {token}", "X-Gitea-Token": token}, ) last_error: Exception | None = None for auth_headers in header_candidates: req = Request( url, method="GET", headers={ "Accept": "*/*", **auth_headers, }, ) try: with urlopen(req, timeout=30) as resp: content = resp.read() content_type = resp.headers.get("Content-Type") return content, content_type except HTTPError as error: last_error = error if error.code in {401, 403}: continue raise except URLError as error: last_error = error continue if last_error is not None: raise last_error raise RuntimeError("failed to download attachment") def _normalize_url(raw_url: str, base_url: str) -> str | None: candidate = html.unescape(str(raw_url or "").strip()) if not candidate: return None candidate = candidate.strip("<>\"'") if not candidate: return None if candidate.startswith("//"): base_scheme = urlparse(base_url).scheme or "https" candidate = f"{base_scheme}:{candidate}" if candidate.startswith("http://") or candidate.startswith("https://"): return candidate return urljoin(f"{base_url.rstrip('/')}/", candidate) def _asset_to_urls(asset: dict[str, Any], base_url: str) -> list[str]: urls: list[str] = [] for key in ("browser_download_url", "download_url", "url", "href", "link"): normalized = _normalize_url(str(asset.get(key) or ""), base_url) if normalized and normalized not in urls: urls.append(normalized) uuid_value = str(asset.get("uuid") or "").strip() if uuid_value: fallback = _normalize_url(f"/attachments/{uuid_value}", base_url) if fallback and fallback not in urls: urls.append(fallback) return urls def _extract_asset_urls(payload: dict[str, Any], base_url: str) -> list[str]: results: list[str] = [] for key in ("assets", "attachments"): assets = payload.get(key) or [] if not isinstance(assets, list): continue for asset in assets: if not isinstance(asset, dict): continue for url in _asset_to_urls(asset, base_url): if url not in results: results.append(url) return results def _request_json_optional( *, base_url: str, token: str, path: str, query: dict[str, Any] | None = None, ) -> Any | None: try: return _request_json(base_url, token, path, query=query) except HTTPError as error: if error.code in {401, 403, 404, 405}: return None raise except URLError: return None def _list_asset_urls_from_endpoint( *, base_url: str, token: str, path: str, ) -> list[str]: urls: list[str] = [] page = 1 while True: payload = _request_json_optional( base_url=base_url, token=token, path=path, query={"limit": 50, "page": page}, ) if payload is None: break if not isinstance(payload, list) or not payload: break for asset in payload: if not isinstance(asset, dict): continue for url in _asset_to_urls(asset, base_url): if url not in urls: urls.append(url) if len(payload) < 50: break page += 1 return urls def _list_issue_attachment_urls( *, base_url: str, api_root: str, token: str, issue_number: int, ) -> list[str]: return _list_asset_urls_from_endpoint( base_url=base_url, token=token, path=f"{api_root}/issues/{issue_number}/assets", ) def _list_comment_attachment_urls( *, base_url: str, api_root: str, token: str, comment_id: int, ) -> list[str]: return _list_asset_urls_from_endpoint( base_url=base_url, token=token, path=f"{api_root}/issues/comments/{comment_id}/assets", ) def _extract_attachments(text: str, base_url: str) -> list[str]: if not text: return [] urls = [ *IMG_MD_RE.findall(text), *IMG_HTML_RE.findall(text), *IMG_URL_RE.findall(text), *ATTACHMENT_PATH_RE.findall(text), ] normalized: list[str] = [] for url in urls: cleaned = _normalize_url(str(url), base_url) if cleaned: normalized.append(cleaned) return sorted(set(normalized)) def _normalize_branch_name(raw_value: str) -> str | None: candidate = str(raw_value or "").strip().strip("`'\"") candidate = re.sub(r"[),.;]+$", "", candidate) if not candidate: return None if len(candidate) > 160: return None if not BRANCH_ALLOWED_RE.fullmatch(candidate): return None return candidate def _extract_branch_hints(text: str) -> list[str]: if not text: return [] results: list[str] = [] for regex in (BRANCH_LABEL_RE, BRANCH_INLINE_RE): for match in regex.findall(text): branch = _normalize_branch_name(match) if branch and branch not in results: results.append(branch) return results def _pick_issue_branch(body: str, comments: list[dict[str, Any]]) -> str | None: for branch in _extract_branch_hints(body): return branch for comment in reversed(comments): for branch in _extract_branch_hints(str(comment.get("body") or "")): return branch return None def _normalize_for_similarity(text: str) -> str: lowered = text.lower() lowered = re.sub(r"[`*_>#~=\[\](){}:;,.!?/\\|+-]+", " ", lowered) lowered = re.sub(r"\s+", " ", lowered).strip() return lowered def _quality_score(issue: dict[str, Any], attachments: list[str], comments: list[dict[str, Any]]) -> int: title = str(issue.get("title") or "") body = str(issue.get("body") or "") comment_blob = "\n".join(str(item.get("body") or "") for item in comments[:5]) text = f"{title}\n{body}\n{comment_blob}" score = 0 if re.search( r"(期望|expected).{0,24}(实际|actual)|(实际|actual).{0,24}(期望|expected)", text, re.I | re.S, ): score += 20 if re.search(r"(复现|步骤|step|how to reproduce|重现)", text, re.I): score += 20 if re.search(r"(浏览器|browser|系统|os|版本|version|设备|device|时间)", text, re.I): score += 15 if attachments: score += 15 if len(title.strip()) >= 6: score += 10 if len(re.sub(r"\s+", "", body)) >= 40: score += 20 return min(100, score) def _contains_unresolved_feedback(comments: list[dict[str, Any]]) -> bool: for comment in comments: body = str(comment.get("body") or "").lower() if any(keyword in body for keyword in UNRESOLVED_KEYWORDS): return True return False def _issue_similarity(left: IssueEntry, right: IssueEntry) -> float: lhs = _normalize_for_similarity(f"{left.title} {left.body[:700]}") rhs = _normalize_for_similarity(f"{right.title} {right.body[:700]}") if not lhs or not rhs: return 0.0 return SequenceMatcher(None, lhs, rhs).ratio() def _title_ngrams(title: str) -> set[str]: normalized = re.sub(r"\s+", "", title.lower()) normalized = re.sub(r"[^a-z0-9\u4e00-\u9fff]", "", normalized) grams: set[str] = set() for size in (2, 3): for idx in range(len(normalized) - size + 1): gram = normalized[idx : idx + size] if not gram or gram.isdigit(): continue grams.add(gram) return grams def _build_duplicate_groups(entries: list[IssueEntry], threshold: float) -> list[list[int]]: if not entries: return [] pairs: list[tuple[int, int]] = [] numbers = [item.number for item in entries] for i in range(len(entries)): for j in range(i + 1, len(entries)): ratio = _issue_similarity(entries[i], entries[j]) if ratio >= threshold: pairs.append((entries[i].number, entries[j].number)) groups: list[list[int]] = [] seen: set[int] = set() graph: dict[int, set[int]] = {} for a, b in pairs: graph.setdefault(a, set()).add(b) graph.setdefault(b, set()).add(a) for number in numbers: if number in seen or number not in graph: continue stack = [number] group: list[int] = [] while stack: node = stack.pop() if node in seen: continue seen.add(node) group.append(node) stack.extend(graph.get(node, set())) if len(group) > 1: groups.append(sorted(group)) return sorted(groups, key=lambda item: item[0]) def _build_closed_open_links( closed_entries: list[IssueEntry], open_entries: list[IssueEntry], threshold: float, min_title_ngram_overlap: int, ) -> list[dict[str, Any]]: links: list[dict[str, Any]] = [] for closed_issue in closed_entries: closed_at = _to_datetime(closed_issue.closed_at) or _to_datetime(closed_issue.updated_at) if not closed_at: continue best_open: IssueEntry | None = None best_ratio = 0.0 best_overlap = 0 closed_grams = _title_ngrams(closed_issue.title) for open_issue in open_entries: open_created = _to_datetime(open_issue.created_at) if open_created and open_created < closed_at: continue ratio = _issue_similarity(closed_issue, open_issue) overlap = len(closed_grams & _title_ngrams(open_issue.title)) if ratio > best_ratio or (ratio == best_ratio and overlap > best_overlap): best_ratio = ratio best_overlap = overlap best_open = open_issue if ( best_open and best_ratio >= threshold and best_overlap >= max(1, min_title_ngram_overlap) ): links.append( { "closed_issue": closed_issue.number, "open_issue": best_open.number, "similarity": round(best_ratio, 4), "title_ngram_overlap": best_overlap, } ) return sorted(links, key=lambda item: item["closed_issue"]) def _load_issues( base_url: str, api_root: str, token: str, state: str, *, fetch_asset_endpoints: bool, ) -> list[IssueEntry]: states = ["open", "closed"] if state == "all" else [state] collected: list[IssueEntry] = [] for target_state in states: page = 1 while True: issues = _request_json( base_url, token, f"{api_root}/issues", query={"state": target_state, "limit": 50, "page": page}, ) if not issues: break for issue in issues: number = int(issue["number"]) body = str(issue.get("body") or "") comments = _request_json( base_url, token, f"{api_root}/issues/{number}/comments", query={"limit": 100}, ) comments = comments or [] attachments = _extract_attachments(body, base_url) attachments.extend(_extract_asset_urls(issue, base_url)) if fetch_asset_endpoints: attachments.extend( _list_issue_attachment_urls( base_url=base_url, api_root=api_root, token=token, issue_number=number, ) ) for comment in comments: attachments.extend(_extract_attachments(str(comment.get("body") or ""), base_url)) attachments.extend(_extract_asset_urls(comment, base_url)) comment_id_raw = comment.get("id") comment_id = ( comment_id_raw if isinstance(comment_id_raw, int) else int(comment_id_raw) if isinstance(comment_id_raw, str) and comment_id_raw.isdigit() else None ) if fetch_asset_endpoints and comment_id is not None: attachments.extend( _list_comment_attachment_urls( base_url=base_url, api_root=api_root, token=token, comment_id=comment_id, ) ) attachments = sorted(set(attachments)) collected.append( IssueEntry( number=number, state=str(issue.get("state") or target_state), title=str(issue.get("title") or ""), body=body, created_at=str(issue.get("created_at") or ""), updated_at=str(issue.get("updated_at") or ""), closed_at=issue.get("closed_at"), comments=comments, attachments=attachments, quality_score=_quality_score(issue, attachments, comments), target_branch=_pick_issue_branch(body, comments), ) ) if len(issues) < 50: break page += 1 return sorted(collected, key=lambda item: item.number) def _needs_quality_feedback(issue: IssueEntry, min_score: int) -> bool: if issue.state != "open" or issue.quality_score >= min_score: return False for comment in issue.comments: if QUALITY_MARKER in str(comment.get("body") or ""): return False return True def _quality_feedback_message() -> str: return ( f"{QUALITY_MARKER}\n" "当前工单暂不满足开发准入标准,已进入待补充队列。请补充以下信息后我们将立即纳入修复流程:\n" "1) 复现步骤(至少 3 步)\n" "2) 期望结果 vs 实际结果\n" "3) 环境信息(浏览器/系统/时间)\n" "4) 截图或录屏(建议标注异常区域)" ) def _pick_ext_from_url_or_mime(url: str, content_type: str | None) -> str: parsed = urlparse(url) suffix = Path(parsed.path).suffix.lower().strip() if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}: return suffix normalized = (content_type or "").lower() if "png" in normalized: return ".png" if "jpeg" in normalized or "jpg" in normalized: return ".jpg" if "gif" in normalized: return ".gif" if "webp" in normalized: return ".webp" if "svg" in normalized: return ".svg" return ".bin" def _download_attachments( *, entries: list[IssueEntry], token: str, output_dir: Path, max_per_issue: int, ) -> list[dict[str, Any]]: output_dir.mkdir(parents=True, exist_ok=True) downloaded: list[dict[str, Any]] = [] seen_hashes: set[str] = set() for issue in entries: if not issue.attachments: continue for idx, url in enumerate(issue.attachments[:max_per_issue], start=1): digest = hashlib.sha1(url.encode("utf-8")).hexdigest() if digest in seen_hashes: continue seen_hashes.add(digest) try: blob, content_type = _request_binary(url, token) ext = _pick_ext_from_url_or_mime(url, content_type) file_name = f"issue-{issue.number}-{idx}-{digest[:8]}{ext}" local_path = output_dir / file_name local_path.write_bytes(blob) downloaded.append( { "issue": issue.number, "url": url, "path": str(local_path.as_posix()), "size_bytes": len(blob), "content_type": content_type or "", "status": "ok", } ) except Exception as error: # noqa: BLE001 downloaded.append( { "issue": issue.number, "url": url, "path": "", "size_bytes": 0, "content_type": "", "status": "failed", "error": str(error), } ) return downloaded def _render_report( output_path: Path, *, unresolved_closed: list[dict[str, Any]], low_quality_open: list[IssueEntry], duplicate_groups: list[list[int]], closed_open_links: list[dict[str, Any]], downloaded_attachments: list[dict[str, Any]], all_entries: list[IssueEntry], ) -> None: lines: list[str] = [] lines.append("# Issue Audit Report") lines.append("") lines.append(f"- total issues: {len(all_entries)}") lines.append(f"- closed_but_unresolved: {len(unresolved_closed)}") lines.append(f"- open_low_quality: {len(low_quality_open)}") lines.append(f"- duplicate_groups: {len(duplicate_groups)}") lines.append(f"- closed_open_reopen_candidates: {len(closed_open_links)}") issues_with_attachments = [item for item in all_entries if item.attachments] lines.append(f"- issues_with_attachments: {len(issues_with_attachments)}") lines.append( f"- attachment_urls_detected: {sum(len(item.attachments) for item in issues_with_attachments)}" ) open_entries = [item for item in all_entries if item.state == "open"] open_with_branch = [item for item in open_entries if item.target_branch] lines.append(f"- open_with_branch_hint: {len(open_with_branch)}/{len(open_entries)}") if downloaded_attachments: ok_count = sum(1 for item in downloaded_attachments if item["status"] == "ok") failed_count = sum(1 for item in downloaded_attachments if item["status"] != "ok") lines.append(f"- attachments_downloaded: {ok_count}/{len(downloaded_attachments)}") lines.append(f"- attachments_download_failed: {failed_count}") lines.append("") lines.append("## Closed But Unresolved") if not unresolved_closed: lines.append("- none") else: for item in unresolved_closed: lines.append( f"- #{item['number']} {item['title']} (reason={item['reason']}, related_open={item.get('related_open')}, similarity={item.get('similarity')})" ) lines.append("") lines.append("## Closed/Open Regression Candidates") if not closed_open_links: lines.append("- none") else: for item in closed_open_links: lines.append( f"- closed #{item['closed_issue']} -> open #{item['open_issue']} (similarity={item['similarity']}, title_overlap={item['title_ngram_overlap']})" ) lines.append("") lines.append("## Open Low Quality") if not low_quality_open: lines.append("- none") else: for issue in low_quality_open: lines.append( f"- #{issue.number} {issue.title} (score={issue.quality_score}, branch={issue.target_branch or 'missing'}, attachments={len(issue.attachments)})" ) lines.append("") lines.append("## Open Issue Branch Mapping") if not open_entries: lines.append("- none") else: for issue in open_entries: lines.append(f"- #{issue.number} -> {issue.target_branch or 'missing'}") lines.append("") lines.append("## Duplicate Groups (Open)") if not duplicate_groups: lines.append("- none") else: for group in duplicate_groups: lines.append(f"- {', '.join(f'#{num}' for num in group)}") lines.append("") if downloaded_attachments: lines.append("## Attachment Download Manifest") for item in downloaded_attachments: lines.append( f"- issue #{item['issue']}: {item['status']} -> {item['path'] or item['url']}" ) lines.append("") output_path.write_text("\n".join(lines), encoding="utf-8") def main() -> None: parser = argparse.ArgumentParser(description="Audit Gitea issues for delivery workflow.") parser.add_argument("--base-url", required=True, help="Gitea host, e.g. https://fun-md.com") parser.add_argument("--repo", required=True, help="owner/repo") parser.add_argument("--token", required=True, help="Gitea API token") parser.add_argument("--state", default="all", choices=["open", "closed", "all"]) parser.add_argument("--output-dir", default=".tmp/issue-audit") parser.add_argument("--min-quality-score", type=int, default=70) parser.add_argument("--dedupe-threshold", type=float, default=0.62) parser.add_argument("--reopen-similarity-threshold", type=float, default=0.27) parser.add_argument( "--reopen-title-overlap", type=int, default=2, help="Minimum 2/3-char title n-gram overlap for closed/open regression candidates.", ) parser.add_argument( "--post-quality-feedback", action="store_true", help="Post needs-info comment for low quality open issues.", ) parser.add_argument( "--download-attachments", action="store_true", help="Download image attachments to output-dir/attachments for manual visual review.", ) parser.add_argument( "--max-attachments-per-issue", type=int, default=8, help="Limit downloaded attachments per issue to avoid huge sync.", ) parser.add_argument( "--skip-asset-endpoints", action="store_true", help="Skip /issues/*/assets API calls and only parse URLs from issue/comment payloads.", ) args = parser.parse_args() owner, repo_name = args.repo.split("/", 1) api_root = f"/api/v1/repos/{owner}/{repo_name}" entries = _load_issues( args.base_url, api_root, args.token, args.state, fetch_asset_endpoints=not args.skip_asset_endpoints, ) open_entries = [issue for issue in entries if issue.state == "open"] closed_entries = [issue for issue in entries if issue.state == "closed"] issues_with_attachments = [issue for issue in entries if issue.attachments] open_with_branch = [issue for issue in open_entries if issue.target_branch] open_missing_branch = [issue for issue in open_entries if not issue.target_branch] low_quality_open = [issue for issue in open_entries if issue.quality_score < args.min_quality_score] duplicate_groups = _build_duplicate_groups(open_entries, args.dedupe_threshold) closed_open_links = _build_closed_open_links( closed_entries, open_entries, args.reopen_similarity_threshold, args.reopen_title_overlap, ) unresolved_closed: list[dict[str, Any]] = [] for issue in closed_entries: if _contains_unresolved_feedback(issue.comments): unresolved_closed.append( { "number": issue.number, "title": issue.title, "reason": "comment_feedback", } ) unresolved_closed = sorted(unresolved_closed, key=lambda item: item["number"]) if args.post_quality_feedback: for issue in low_quality_open: if not _needs_quality_feedback(issue, args.min_quality_score): continue _request_json( args.base_url, args.token, f"{api_root}/issues/{issue.number}/comments", method="POST", body={"body": _quality_feedback_message()}, ) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) downloaded_attachments: list[dict[str, Any]] = [] if args.download_attachments: downloaded_attachments = _download_attachments( entries=entries, token=args.token, output_dir=output_dir / "attachments", max_per_issue=max(1, args.max_attachments_per_issue), ) payload = { "summary": { "total": len(entries), "open": len(open_entries), "closed": len(closed_entries), "closed_but_unresolved": len(unresolved_closed), "open_low_quality": len(low_quality_open), "duplicate_groups": len(duplicate_groups), "closed_open_reopen_candidates": len(closed_open_links), "issues_with_attachments": len(issues_with_attachments), "attachment_urls_detected": sum(len(issue.attachments) for issue in issues_with_attachments), "open_with_branch_hint": len(open_with_branch), "open_missing_branch_hint": len(open_missing_branch), "attachments_downloaded": sum( 1 for item in downloaded_attachments if item.get("status") == "ok" ), "attachments_download_failed": sum( 1 for item in downloaded_attachments if item.get("status") != "ok" ), }, "unresolved_closed": unresolved_closed, "closed_open_links": closed_open_links, "open_low_quality": [item.brief() for item in low_quality_open], "open_missing_branch_issues": [item.brief() for item in open_missing_branch], "duplicate_groups": duplicate_groups, "attachments_manifest": downloaded_attachments, "issues": [item.brief() for item in entries], } (output_dir / "issue_audit.json").write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) _render_report( output_dir / "issue_audit_report.md", unresolved_closed=unresolved_closed, low_quality_open=low_quality_open, duplicate_groups=duplicate_groups, closed_open_links=closed_open_links, downloaded_attachments=downloaded_attachments, all_entries=entries, ) print(json.dumps(payload["summary"], ensure_ascii=False)) if __name__ == "__main__": main()