feat: add gitea agentic runtime control plane
This commit is contained in:
73
engine/devops_agent/providers/gitea.py
Normal file
73
engine/devops_agent/providers/gitea.py
Normal file
@@ -0,0 +1,73 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any, Callable
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
|
||||
Transport = Callable[..., dict[str, Any]]
|
||||
|
||||
|
||||
class GiteaProvider:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
token: str,
|
||||
transport: Transport | None = None,
|
||||
) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.token = token
|
||||
self.transport = transport
|
||||
|
||||
def _request(
|
||||
self,
|
||||
*,
|
||||
method: str,
|
||||
path: str,
|
||||
body: dict[str, object] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
url = f"{self.base_url}{path}"
|
||||
headers = {
|
||||
"Authorization": f"token {self.token}",
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
if self.transport is not None:
|
||||
return self.transport(method=method, url=url, headers=headers, body=body)
|
||||
|
||||
payload = None if body is None else json.dumps(body).encode("utf-8")
|
||||
request = Request(url, method=method, headers=headers, data=payload)
|
||||
with urlopen(request, timeout=30) as response:
|
||||
raw = response.read().decode("utf-8")
|
||||
return json.loads(raw) if raw else {}
|
||||
|
||||
def get_issue(self, repo: str, issue_number: int) -> dict[str, Any]:
|
||||
return self._request(
|
||||
method="GET",
|
||||
path=f"/api/v1/repos/{repo}/issues/{issue_number}",
|
||||
)
|
||||
|
||||
def post_issue_comment(
|
||||
self,
|
||||
repo: str,
|
||||
issue_number: int,
|
||||
body: str,
|
||||
) -> dict[str, Any]:
|
||||
return self._request(
|
||||
method="POST",
|
||||
path=f"/api/v1/repos/{repo}/issues/{issue_number}/comments",
|
||||
body={"body": body},
|
||||
)
|
||||
|
||||
def parse_issue_comment_event(self, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
repository = payload.get("repository") or {}
|
||||
issue = payload.get("issue") or {}
|
||||
comment = payload.get("comment") or {}
|
||||
|
||||
return {
|
||||
"repo": repository.get("full_name", ""),
|
||||
"issue_number": int(issue.get("number", 0)),
|
||||
"comment_body": str(comment.get("body", "")),
|
||||
}
|
||||
Reference in New Issue
Block a user