Files

874 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import html
import hashlib
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urljoin, urlparse
from urllib.request import Request, urlopen
IMG_MD_RE = re.compile(
r"!\[[^\]]*\]\(\s*<?([^\s>)]+)(?:\s+[\"'][^\"']*[\"'])?\s*\)"
)
IMG_HTML_RE = re.compile(r"<img[^>]+src=[\"']([^\"']+)[\"']", re.IGNORECASE)
IMG_URL_RE = re.compile(r"(https?://[^\s)]+?\.(?:png|jpg|jpeg|gif|webp|svg))", re.IGNORECASE)
ATTACHMENT_PATH_RE = re.compile(
r"((?:https?://[^\s)\"'>]+)?/(?:attachments|repo-attachments|api/v1/repos/[^\s)\"'>]+/issues(?:/comments)?/\d+/assets/\d+)[^\s)\"'>]*)",
re.IGNORECASE,
)
UNRESOLVED_KEYWORDS = (
"未修复",
"没有修复",
"问题还在",
"依旧",
"仍然",
"还是",
"无法",
"没解决",
"still not fixed",
"not fixed",
"cannot reproduce? no",
"failed",
"broken",
)
QUALITY_MARKER = "[issue-quality-feedback-v1]"
BRANCH_LABEL_RE = re.compile(
r"(?:^|[\r\n])\s*(?:branch|target branch|working branch|fix branch|分支|目标分支)\s*[:=]\s*`?([A-Za-z0-9._/\-]+)`?",
re.IGNORECASE,
)
BRANCH_INLINE_RE = re.compile(
r"(?:^|[\s,;])(?:/branch|branch)\s+`?([A-Za-z0-9._/\-]+)`?",
re.IGNORECASE,
)
BRANCH_ALLOWED_RE = re.compile(r"^[A-Za-z0-9._/\-]+$")
@dataclass
class IssueEntry:
number: int
state: str
title: str
body: str
created_at: str
updated_at: str
closed_at: str | None
comments: list[dict[str, Any]]
attachments: list[str]
quality_score: int
target_branch: str | None
def brief(self) -> dict[str, Any]:
return {
"number": self.number,
"state": self.state,
"title": self.title,
"quality_score": self.quality_score,
"target_branch": self.target_branch,
"attachments": len(self.attachments),
"created_at": self.created_at,
"updated_at": self.updated_at,
"closed_at": self.closed_at,
}
def _to_datetime(value: str | None) -> datetime | None:
raw = (value or "").strip()
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
except ValueError:
return None
def _request_json(
base_url: str,
token: str,
path: str,
query: dict[str, Any] | None = None,
method: str = "GET",
body: dict[str, Any] | None = None,
) -> Any:
query_str = f"?{urlencode(query)}" if query else ""
url = f"{base_url.rstrip('/')}{path}{query_str}"
payload = None if body is None else json.dumps(body).encode("utf-8")
req = Request(
url,
method=method,
headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
},
data=payload,
)
with urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else None
def _request_binary(url: str, token: str) -> tuple[bytes, str | None]:
header_candidates = (
{"Authorization": f"token {token}"},
{"Authorization": f"Bearer {token}"},
{"X-Gitea-Token": token},
{"Authorization": f"token {token}", "X-Gitea-Token": token},
)
last_error: Exception | None = None
for auth_headers in header_candidates:
req = Request(
url,
method="GET",
headers={
"Accept": "*/*",
**auth_headers,
},
)
try:
with urlopen(req, timeout=30) as resp:
content = resp.read()
content_type = resp.headers.get("Content-Type")
return content, content_type
except HTTPError as error:
last_error = error
if error.code in {401, 403}:
continue
raise
except URLError as error:
last_error = error
continue
if last_error is not None:
raise last_error
raise RuntimeError("failed to download attachment")
def _normalize_url(raw_url: str, base_url: str) -> str | None:
candidate = html.unescape(str(raw_url or "").strip())
if not candidate:
return None
candidate = candidate.strip("<>\"'")
if not candidate:
return None
if candidate.startswith("//"):
base_scheme = urlparse(base_url).scheme or "https"
candidate = f"{base_scheme}:{candidate}"
if candidate.startswith("http://") or candidate.startswith("https://"):
return candidate
return urljoin(f"{base_url.rstrip('/')}/", candidate)
def _asset_to_urls(asset: dict[str, Any], base_url: str) -> list[str]:
urls: list[str] = []
for key in ("browser_download_url", "download_url", "url", "href", "link"):
normalized = _normalize_url(str(asset.get(key) or ""), base_url)
if normalized and normalized not in urls:
urls.append(normalized)
uuid_value = str(asset.get("uuid") or "").strip()
if uuid_value:
fallback = _normalize_url(f"/attachments/{uuid_value}", base_url)
if fallback and fallback not in urls:
urls.append(fallback)
return urls
def _extract_asset_urls(payload: dict[str, Any], base_url: str) -> list[str]:
results: list[str] = []
for key in ("assets", "attachments"):
assets = payload.get(key) or []
if not isinstance(assets, list):
continue
for asset in assets:
if not isinstance(asset, dict):
continue
for url in _asset_to_urls(asset, base_url):
if url not in results:
results.append(url)
return results
def _request_json_optional(
*,
base_url: str,
token: str,
path: str,
query: dict[str, Any] | None = None,
) -> Any | None:
try:
return _request_json(base_url, token, path, query=query)
except HTTPError as error:
if error.code in {401, 403, 404, 405}:
return None
raise
except URLError:
return None
def _list_asset_urls_from_endpoint(
*,
base_url: str,
token: str,
path: str,
) -> list[str]:
urls: list[str] = []
page = 1
while True:
payload = _request_json_optional(
base_url=base_url,
token=token,
path=path,
query={"limit": 50, "page": page},
)
if payload is None:
break
if not isinstance(payload, list) or not payload:
break
for asset in payload:
if not isinstance(asset, dict):
continue
for url in _asset_to_urls(asset, base_url):
if url not in urls:
urls.append(url)
if len(payload) < 50:
break
page += 1
return urls
def _list_issue_attachment_urls(
*,
base_url: str,
api_root: str,
token: str,
issue_number: int,
) -> list[str]:
return _list_asset_urls_from_endpoint(
base_url=base_url,
token=token,
path=f"{api_root}/issues/{issue_number}/assets",
)
def _list_comment_attachment_urls(
*,
base_url: str,
api_root: str,
token: str,
comment_id: int,
) -> list[str]:
return _list_asset_urls_from_endpoint(
base_url=base_url,
token=token,
path=f"{api_root}/issues/comments/{comment_id}/assets",
)
def _extract_attachments(text: str, base_url: str) -> list[str]:
if not text:
return []
urls = [
*IMG_MD_RE.findall(text),
*IMG_HTML_RE.findall(text),
*IMG_URL_RE.findall(text),
*ATTACHMENT_PATH_RE.findall(text),
]
normalized: list[str] = []
for url in urls:
cleaned = _normalize_url(str(url), base_url)
if cleaned:
normalized.append(cleaned)
return sorted(set(normalized))
def _normalize_branch_name(raw_value: str) -> str | None:
candidate = str(raw_value or "").strip().strip("`'\"")
candidate = re.sub(r"[),.;]+$", "", candidate)
if not candidate:
return None
if len(candidate) > 160:
return None
if not BRANCH_ALLOWED_RE.fullmatch(candidate):
return None
return candidate
def _extract_branch_hints(text: str) -> list[str]:
if not text:
return []
results: list[str] = []
for regex in (BRANCH_LABEL_RE, BRANCH_INLINE_RE):
for match in regex.findall(text):
branch = _normalize_branch_name(match)
if branch and branch not in results:
results.append(branch)
return results
def _pick_issue_branch(body: str, comments: list[dict[str, Any]]) -> str | None:
for branch in _extract_branch_hints(body):
return branch
for comment in reversed(comments):
for branch in _extract_branch_hints(str(comment.get("body") or "")):
return branch
return None
def _normalize_for_similarity(text: str) -> str:
lowered = text.lower()
lowered = re.sub(r"[`*_>#~=\[\](){}:;,.!?/\\|+-]+", " ", lowered)
lowered = re.sub(r"\s+", " ", lowered).strip()
return lowered
def _quality_score(issue: dict[str, Any], attachments: list[str], comments: list[dict[str, Any]]) -> int:
title = str(issue.get("title") or "")
body = str(issue.get("body") or "")
comment_blob = "\n".join(str(item.get("body") or "") for item in comments[:5])
text = f"{title}\n{body}\n{comment_blob}"
score = 0
if re.search(
r"(期望|expected).{0,24}(实际|actual)|(实际|actual).{0,24}(期望|expected)",
text,
re.I | re.S,
):
score += 20
if re.search(r"(复现|步骤|step|how to reproduce|重现)", text, re.I):
score += 20
if re.search(r"(浏览器|browser|系统|os|版本|version|设备|device|时间)", text, re.I):
score += 15
if attachments:
score += 15
if len(title.strip()) >= 6:
score += 10
if len(re.sub(r"\s+", "", body)) >= 40:
score += 20
return min(100, score)
def _contains_unresolved_feedback(comments: list[dict[str, Any]]) -> bool:
for comment in comments:
body = str(comment.get("body") or "").lower()
if any(keyword in body for keyword in UNRESOLVED_KEYWORDS):
return True
return False
def _issue_similarity(left: IssueEntry, right: IssueEntry) -> float:
lhs = _normalize_for_similarity(f"{left.title} {left.body[:700]}")
rhs = _normalize_for_similarity(f"{right.title} {right.body[:700]}")
if not lhs or not rhs:
return 0.0
return SequenceMatcher(None, lhs, rhs).ratio()
def _title_ngrams(title: str) -> set[str]:
normalized = re.sub(r"\s+", "", title.lower())
normalized = re.sub(r"[^a-z0-9\u4e00-\u9fff]", "", normalized)
grams: set[str] = set()
for size in (2, 3):
for idx in range(len(normalized) - size + 1):
gram = normalized[idx : idx + size]
if not gram or gram.isdigit():
continue
grams.add(gram)
return grams
def _build_duplicate_groups(entries: list[IssueEntry], threshold: float) -> list[list[int]]:
if not entries:
return []
pairs: list[tuple[int, int]] = []
numbers = [item.number for item in entries]
for i in range(len(entries)):
for j in range(i + 1, len(entries)):
ratio = _issue_similarity(entries[i], entries[j])
if ratio >= threshold:
pairs.append((entries[i].number, entries[j].number))
groups: list[list[int]] = []
seen: set[int] = set()
graph: dict[int, set[int]] = {}
for a, b in pairs:
graph.setdefault(a, set()).add(b)
graph.setdefault(b, set()).add(a)
for number in numbers:
if number in seen or number not in graph:
continue
stack = [number]
group: list[int] = []
while stack:
node = stack.pop()
if node in seen:
continue
seen.add(node)
group.append(node)
stack.extend(graph.get(node, set()))
if len(group) > 1:
groups.append(sorted(group))
return sorted(groups, key=lambda item: item[0])
def _build_closed_open_links(
closed_entries: list[IssueEntry],
open_entries: list[IssueEntry],
threshold: float,
min_title_ngram_overlap: int,
) -> list[dict[str, Any]]:
links: list[dict[str, Any]] = []
for closed_issue in closed_entries:
closed_at = _to_datetime(closed_issue.closed_at) or _to_datetime(closed_issue.updated_at)
if not closed_at:
continue
best_open: IssueEntry | None = None
best_ratio = 0.0
best_overlap = 0
closed_grams = _title_ngrams(closed_issue.title)
for open_issue in open_entries:
open_created = _to_datetime(open_issue.created_at)
if open_created and open_created < closed_at:
continue
ratio = _issue_similarity(closed_issue, open_issue)
overlap = len(closed_grams & _title_ngrams(open_issue.title))
if ratio > best_ratio or (ratio == best_ratio and overlap > best_overlap):
best_ratio = ratio
best_overlap = overlap
best_open = open_issue
if (
best_open
and best_ratio >= threshold
and best_overlap >= max(1, min_title_ngram_overlap)
):
links.append(
{
"closed_issue": closed_issue.number,
"open_issue": best_open.number,
"similarity": round(best_ratio, 4),
"title_ngram_overlap": best_overlap,
}
)
return sorted(links, key=lambda item: item["closed_issue"])
def _load_issues(
base_url: str,
api_root: str,
token: str,
state: str,
*,
fetch_asset_endpoints: bool,
) -> list[IssueEntry]:
states = ["open", "closed"] if state == "all" else [state]
collected: list[IssueEntry] = []
for target_state in states:
page = 1
while True:
issues = _request_json(
base_url,
token,
f"{api_root}/issues",
query={"state": target_state, "limit": 50, "page": page},
)
if not issues:
break
for issue in issues:
number = int(issue["number"])
body = str(issue.get("body") or "")
comments = _request_json(
base_url,
token,
f"{api_root}/issues/{number}/comments",
query={"limit": 100},
)
comments = comments or []
attachments = _extract_attachments(body, base_url)
attachments.extend(_extract_asset_urls(issue, base_url))
if fetch_asset_endpoints:
attachments.extend(
_list_issue_attachment_urls(
base_url=base_url,
api_root=api_root,
token=token,
issue_number=number,
)
)
for comment in comments:
attachments.extend(_extract_attachments(str(comment.get("body") or ""), base_url))
attachments.extend(_extract_asset_urls(comment, base_url))
comment_id_raw = comment.get("id")
comment_id = (
comment_id_raw
if isinstance(comment_id_raw, int)
else int(comment_id_raw)
if isinstance(comment_id_raw, str) and comment_id_raw.isdigit()
else None
)
if fetch_asset_endpoints and comment_id is not None:
attachments.extend(
_list_comment_attachment_urls(
base_url=base_url,
api_root=api_root,
token=token,
comment_id=comment_id,
)
)
attachments = sorted(set(attachments))
collected.append(
IssueEntry(
number=number,
state=str(issue.get("state") or target_state),
title=str(issue.get("title") or ""),
body=body,
created_at=str(issue.get("created_at") or ""),
updated_at=str(issue.get("updated_at") or ""),
closed_at=issue.get("closed_at"),
comments=comments,
attachments=attachments,
quality_score=_quality_score(issue, attachments, comments),
target_branch=_pick_issue_branch(body, comments),
)
)
if len(issues) < 50:
break
page += 1
return sorted(collected, key=lambda item: item.number)
def _needs_quality_feedback(issue: IssueEntry, min_score: int) -> bool:
if issue.state != "open" or issue.quality_score >= min_score:
return False
for comment in issue.comments:
if QUALITY_MARKER in str(comment.get("body") or ""):
return False
return True
def _quality_feedback_message() -> str:
return (
f"{QUALITY_MARKER}\n"
"当前工单暂不满足开发准入标准,已进入待补充队列。请补充以下信息后我们将立即纳入修复流程:\n"
"1) 复现步骤(至少 3 步)\n"
"2) 期望结果 vs 实际结果\n"
"3) 环境信息(浏览器/系统/时间)\n"
"4) 截图或录屏(建议标注异常区域)"
)
def _pick_ext_from_url_or_mime(url: str, content_type: str | None) -> str:
parsed = urlparse(url)
suffix = Path(parsed.path).suffix.lower().strip()
if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}:
return suffix
normalized = (content_type or "").lower()
if "png" in normalized:
return ".png"
if "jpeg" in normalized or "jpg" in normalized:
return ".jpg"
if "gif" in normalized:
return ".gif"
if "webp" in normalized:
return ".webp"
if "svg" in normalized:
return ".svg"
return ".bin"
def _download_attachments(
*,
entries: list[IssueEntry],
token: str,
output_dir: Path,
max_per_issue: int,
) -> list[dict[str, Any]]:
output_dir.mkdir(parents=True, exist_ok=True)
downloaded: list[dict[str, Any]] = []
seen_hashes: set[str] = set()
for issue in entries:
if not issue.attachments:
continue
for idx, url in enumerate(issue.attachments[:max_per_issue], start=1):
digest = hashlib.sha1(url.encode("utf-8")).hexdigest()
if digest in seen_hashes:
continue
seen_hashes.add(digest)
try:
blob, content_type = _request_binary(url, token)
ext = _pick_ext_from_url_or_mime(url, content_type)
file_name = f"issue-{issue.number}-{idx}-{digest[:8]}{ext}"
local_path = output_dir / file_name
local_path.write_bytes(blob)
downloaded.append(
{
"issue": issue.number,
"url": url,
"path": str(local_path.as_posix()),
"size_bytes": len(blob),
"content_type": content_type or "",
"status": "ok",
}
)
except Exception as error: # noqa: BLE001
downloaded.append(
{
"issue": issue.number,
"url": url,
"path": "",
"size_bytes": 0,
"content_type": "",
"status": "failed",
"error": str(error),
}
)
return downloaded
def _render_report(
output_path: Path,
*,
unresolved_closed: list[dict[str, Any]],
low_quality_open: list[IssueEntry],
duplicate_groups: list[list[int]],
closed_open_links: list[dict[str, Any]],
downloaded_attachments: list[dict[str, Any]],
all_entries: list[IssueEntry],
) -> None:
lines: list[str] = []
lines.append("# Issue Audit Report")
lines.append("")
lines.append(f"- total issues: {len(all_entries)}")
lines.append(f"- closed_but_unresolved: {len(unresolved_closed)}")
lines.append(f"- open_low_quality: {len(low_quality_open)}")
lines.append(f"- duplicate_groups: {len(duplicate_groups)}")
lines.append(f"- closed_open_reopen_candidates: {len(closed_open_links)}")
issues_with_attachments = [item for item in all_entries if item.attachments]
lines.append(f"- issues_with_attachments: {len(issues_with_attachments)}")
lines.append(
f"- attachment_urls_detected: {sum(len(item.attachments) for item in issues_with_attachments)}"
)
open_entries = [item for item in all_entries if item.state == "open"]
open_with_branch = [item for item in open_entries if item.target_branch]
lines.append(f"- open_with_branch_hint: {len(open_with_branch)}/{len(open_entries)}")
if downloaded_attachments:
ok_count = sum(1 for item in downloaded_attachments if item["status"] == "ok")
failed_count = sum(1 for item in downloaded_attachments if item["status"] != "ok")
lines.append(f"- attachments_downloaded: {ok_count}/{len(downloaded_attachments)}")
lines.append(f"- attachments_download_failed: {failed_count}")
lines.append("")
lines.append("## Closed But Unresolved")
if not unresolved_closed:
lines.append("- none")
else:
for item in unresolved_closed:
lines.append(
f"- #{item['number']} {item['title']} (reason={item['reason']}, related_open={item.get('related_open')}, similarity={item.get('similarity')})"
)
lines.append("")
lines.append("## Closed/Open Regression Candidates")
if not closed_open_links:
lines.append("- none")
else:
for item in closed_open_links:
lines.append(
f"- closed #{item['closed_issue']} -> open #{item['open_issue']} (similarity={item['similarity']}, title_overlap={item['title_ngram_overlap']})"
)
lines.append("")
lines.append("## Open Low Quality")
if not low_quality_open:
lines.append("- none")
else:
for issue in low_quality_open:
lines.append(
f"- #{issue.number} {issue.title} (score={issue.quality_score}, branch={issue.target_branch or 'missing'}, attachments={len(issue.attachments)})"
)
lines.append("")
lines.append("## Open Issue Branch Mapping")
if not open_entries:
lines.append("- none")
else:
for issue in open_entries:
lines.append(f"- #{issue.number} -> {issue.target_branch or 'missing'}")
lines.append("")
lines.append("## Duplicate Groups (Open)")
if not duplicate_groups:
lines.append("- none")
else:
for group in duplicate_groups:
lines.append(f"- {', '.join(f'#{num}' for num in group)}")
lines.append("")
if downloaded_attachments:
lines.append("## Attachment Download Manifest")
for item in downloaded_attachments:
lines.append(
f"- issue #{item['issue']}: {item['status']} -> {item['path'] or item['url']}"
)
lines.append("")
output_path.write_text("\n".join(lines), encoding="utf-8")
def main() -> None:
parser = argparse.ArgumentParser(description="Audit Gitea issues for delivery workflow.")
parser.add_argument("--base-url", required=True, help="Gitea host, e.g. https://fun-md.com")
parser.add_argument("--repo", required=True, help="owner/repo")
parser.add_argument("--token", required=True, help="Gitea API token")
parser.add_argument("--state", default="all", choices=["open", "closed", "all"])
parser.add_argument("--output-dir", default=".tmp/issue-audit")
parser.add_argument("--min-quality-score", type=int, default=70)
parser.add_argument("--dedupe-threshold", type=float, default=0.62)
parser.add_argument("--reopen-similarity-threshold", type=float, default=0.27)
parser.add_argument(
"--reopen-title-overlap",
type=int,
default=2,
help="Minimum 2/3-char title n-gram overlap for closed/open regression candidates.",
)
parser.add_argument(
"--post-quality-feedback",
action="store_true",
help="Post needs-info comment for low quality open issues.",
)
parser.add_argument(
"--download-attachments",
action="store_true",
help="Download image attachments to output-dir/attachments for manual visual review.",
)
parser.add_argument(
"--max-attachments-per-issue",
type=int,
default=8,
help="Limit downloaded attachments per issue to avoid huge sync.",
)
parser.add_argument(
"--skip-asset-endpoints",
action="store_true",
help="Skip /issues/*/assets API calls and only parse URLs from issue/comment payloads.",
)
args = parser.parse_args()
owner, repo_name = args.repo.split("/", 1)
api_root = f"/api/v1/repos/{owner}/{repo_name}"
entries = _load_issues(
args.base_url,
api_root,
args.token,
args.state,
fetch_asset_endpoints=not args.skip_asset_endpoints,
)
open_entries = [issue for issue in entries if issue.state == "open"]
closed_entries = [issue for issue in entries if issue.state == "closed"]
issues_with_attachments = [issue for issue in entries if issue.attachments]
open_with_branch = [issue for issue in open_entries if issue.target_branch]
open_missing_branch = [issue for issue in open_entries if not issue.target_branch]
low_quality_open = [issue for issue in open_entries if issue.quality_score < args.min_quality_score]
duplicate_groups = _build_duplicate_groups(open_entries, args.dedupe_threshold)
closed_open_links = _build_closed_open_links(
closed_entries,
open_entries,
args.reopen_similarity_threshold,
args.reopen_title_overlap,
)
unresolved_closed: list[dict[str, Any]] = []
for issue in closed_entries:
if _contains_unresolved_feedback(issue.comments):
unresolved_closed.append(
{
"number": issue.number,
"title": issue.title,
"reason": "comment_feedback",
}
)
unresolved_closed = sorted(unresolved_closed, key=lambda item: item["number"])
if args.post_quality_feedback:
for issue in low_quality_open:
if not _needs_quality_feedback(issue, args.min_quality_score):
continue
_request_json(
args.base_url,
args.token,
f"{api_root}/issues/{issue.number}/comments",
method="POST",
body={"body": _quality_feedback_message()},
)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
downloaded_attachments: list[dict[str, Any]] = []
if args.download_attachments:
downloaded_attachments = _download_attachments(
entries=entries,
token=args.token,
output_dir=output_dir / "attachments",
max_per_issue=max(1, args.max_attachments_per_issue),
)
payload = {
"summary": {
"total": len(entries),
"open": len(open_entries),
"closed": len(closed_entries),
"closed_but_unresolved": len(unresolved_closed),
"open_low_quality": len(low_quality_open),
"duplicate_groups": len(duplicate_groups),
"closed_open_reopen_candidates": len(closed_open_links),
"issues_with_attachments": len(issues_with_attachments),
"attachment_urls_detected": sum(len(issue.attachments) for issue in issues_with_attachments),
"open_with_branch_hint": len(open_with_branch),
"open_missing_branch_hint": len(open_missing_branch),
"attachments_downloaded": sum(
1 for item in downloaded_attachments if item.get("status") == "ok"
),
"attachments_download_failed": sum(
1 for item in downloaded_attachments if item.get("status") != "ok"
),
},
"unresolved_closed": unresolved_closed,
"closed_open_links": closed_open_links,
"open_low_quality": [item.brief() for item in low_quality_open],
"open_missing_branch_issues": [item.brief() for item in open_missing_branch],
"duplicate_groups": duplicate_groups,
"attachments_manifest": downloaded_attachments,
"issues": [item.brief() for item in entries],
}
(output_dir / "issue_audit.json").write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
_render_report(
output_dir / "issue_audit_report.md",
unresolved_closed=unresolved_closed,
low_quality_open=low_quality_open,
duplicate_groups=duplicate_groups,
closed_open_links=closed_open_links,
downloaded_attachments=downloaded_attachments,
all_entries=entries,
)
print(json.dumps(payload["summary"], ensure_ascii=False))
if __name__ == "__main__":
main()