patx/gitman
gitman.io - hglab.io fork for git
Commit 5f85899 · patx · 2026-05-05T13:14:28-04:00
Comments
No comments yet.
Diff
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..4c3bdb5
--- /dev/null
+++ b/README.md
@@ -0,0 +1,50 @@
+# GitMan
+
+A small Bottle app that hosts public Git repositories with local user accounts.
+
+Features include public user profiles, repository browsing, commit/branch diffs with comments, README rendering, issues, pull requests, repository contributors, stars, and HTTP Git clone/fetch/push etc.
+
+## Run locally
+
+```sh
+python3 -m pip install -r requirements.txt
+SECRET_KEY=change-me python3 app.py
+```
+
+Then open `http://127.0.0.1:8080`, create an account, and create a repository.
+
+## Git client use
+
+Clone and fetch are public:
+
+```sh
+git clone http://127.0.0.1:8080/git/<user>/<repo>
+```
+
+Push requires the repository owner's or a contributor's username and password:
+
+```sh
+git push http://<user>@127.0.0.1:8080/git/<user>/<repo>
+```
+
+## Configuration
+
+- `SECRET_KEY`: Bottle signed-cookie secret.
+- `GITMAN_DB`: SQLite database path. Defaults to `./data/gitman.sqlite3`.
+- `GITMAN_REPO_ROOT`: Git repository root. Defaults to `./data/repos`.
+- `GITMAN_DEBUG`: set to `1` for Bottle debug/reloader.
+- `GITMAN_MAX_FORM_BYTES`: maximum browser form POST size. Defaults to `65536`.
+- `GITMAN_MAX_RENDER_BYTES`: maximum README/file/diff preview size. Defaults to `262144`.
+- `GITMAN_MAX_GIT_RESPONSE_BYTES`: maximum buffered Git HTTP response size. Defaults to `268435456`.
+- `GITMAN_GIT_BINARY`: Git executable name or full path. Defaults to `git`.
+- `GITMAN_RATE_LIMIT_ENABLED`: set to `0` to disable in-memory login/signup/git auth throttling.
+- `GITMAN_RATE_LIMIT_MAX_FAILURES`: failed attempts before throttling. Defaults to `5`.
+- `GITMAN_RATE_LIMIT_WINDOW_SECONDS`: rate limit window. Defaults to `300`.
+- `GITMAN_RATE_LIMIT_COOLDOWN_SECONDS`: throttle duration. Defaults to `300`.
+- `PORT`: HTTP port. Defaults to `8080`.
+
+When `GITMAN_DEBUG` is disabled, `SECRET_KEY` must be set to a non-default value before the app starts.
+
+This v1 stores repositories on local disk. Do not deploy it to ephemeral filesystems unless repository storage is mounted persistently.
+
+SQLite is configured with WAL mode and a busy timeout so a small multi-worker deployment can share one database file. Keep `GITMAN_DB` on a local persistent filesystem used by one host; network or synced filesystems can break SQLite locking semantics and should use a server database instead.
diff --git a/app.py b/app.py
new file mode 100644
index 0000000..31afb29
--- /dev/null
+++ b/app.py
@@ -0,0 +1,3452 @@
+import base64
+import datetime as dt
+import hashlib
+import hmac
+import html
+import mimetypes
+import os
+import re
+import secrets
+import shutil
+import sqlite3
+import subprocess
+import time
+import tempfile
+from socketserver import ThreadingMixIn
+from pathlib import Path, PurePosixPath
+from urllib.parse import parse_qsl, quote, unquote, urlencode, urlparse
+from wsgiref.simple_server import WSGIServer
+
+import bleach
+import markdown
+from bottle import (
+ Bottle,
+ HTTPResponse,
+ TEMPLATE_PATH,
+ abort,
+ redirect,
+ request,
+ response,
+ run,
+ static_file,
+ template,
+)
+
+
+def env_bool(name, default=False):
+ value = os.environ.get(name)
+ if value is None:
+ return default
+ return value.lower() in {"1", "true", "yes", "on"}
+
+
+def env_int(name, default, minimum=0):
+ try:
+ value = int(os.environ.get(name, str(default)))
+ except ValueError:
+ return default
+ return max(minimum, value)
+
+
+BASE_DIR = Path(__file__).resolve().parent
+DATA_DIR = BASE_DIR / "data"
+DB_PATH = Path(os.environ.get("GITMAN_DB", DATA_DIR / "gitman.sqlite3"))
+REPO_ROOT = Path(os.environ.get("GITMAN_REPO_ROOT", DATA_DIR / "repos"))
+DEFAULT_SECRET_KEY = "dev-secret-change-me"
+SECRET_KEY = os.environ.get("SECRET_KEY", DEFAULT_SECRET_KEY)
+DEBUG = env_bool("GITMAN_DEBUG")
+PASSWORD_ITERATIONS = 260_000
+SQLITE_BUSY_TIMEOUT_MS = 30_000
+MAX_FORM_BYTES = env_int("GITMAN_MAX_FORM_BYTES", 64 * 1024)
+MAX_RENDER_BYTES = env_int("GITMAN_MAX_RENDER_BYTES", 256 * 1024)
+MAX_GIT_RESPONSE_BYTES = env_int("GITMAN_MAX_GIT_RESPONSE_BYTES", 256 * 1024 * 1024)
+GIT_BINARY = os.environ.get("GITMAN_GIT_BINARY", "git")
+DEFAULT_EXEC_PATH = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
+RATE_LIMIT_ENABLED = env_bool("GITMAN_RATE_LIMIT_ENABLED", True)
+RATE_LIMIT_MAX_FAILURES = env_int("GITMAN_RATE_LIMIT_MAX_FAILURES", 5, minimum=1)
+RATE_LIMIT_WINDOW_SECONDS = env_int("GITMAN_RATE_LIMIT_WINDOW_SECONDS", 300, minimum=1)
+RATE_LIMIT_COOLDOWN_SECONDS = env_int("GITMAN_RATE_LIMIT_COOLDOWN_SECONDS", 300, minimum=1)
+SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9._-]{1,62}$")
+REV_RE = re.compile(r"^(null|[0-9a-fA-F]{1,40})$")
+REF_TYPE_BRANCH = "branch"
+REF_TYPE_TAG = "tag"
+REF_TYPE_TIP = "tip"
+REF_TYPE_COMMIT = "commit"
+REF_TYPES = {REF_TYPE_BRANCH, REF_TYPE_TAG, REF_TYPE_TIP, REF_TYPE_COMMIT}
+PULL_REQUEST_REF_TYPES = {REF_TYPE_BRANCH, REF_TYPE_TIP}
+TARGET_PULL_REQUEST_REF_TYPES = {REF_TYPE_BRANCH}
+REF_PICKER_TABS = {"overview", "source", "commits", "tags", "branches"}
+REF_QUERY_KEYS = {"ref", "ref_type", "ref_value"}
+REF_VALUE_SEPARATOR = "|"
+DEFAULT_BRANCH_CANDIDATES = ("main", "master")
+SCRIPT_STYLE_RE = re.compile(r"(?is)<(script|style)\b[^>]*>.*?</\1>")
+RESERVED_USERNAMES = {
+ "dashboard",
+ "favicon.ico",
+ "git",
+ "hg",
+ "login",
+ "logout",
+ "new",
+ "settings",
+ "signup",
+ "static",
+ "harrisonerd",
+}
+CSRF_COOKIE_NAME = "csrf_token"
+CSRF_FORM_FIELD = "_csrf_token"
+NULL_REV = "null"
+NULL_NODE = "0" * 40
+EMPTY_TREE_NODE = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
+README_CANDIDATES = ("README.md", "README.rst", "README.txt", "README")
+MARKDOWN_EXTENSIONS = ("extra", "sane_lists")
+MARKDOWN_TAGS = {
+ "a",
+ "abbr",
+ "acronym",
+ "blockquote",
+ "br",
+ "code",
+ "del",
+ "details",
+ "em",
+ "h1",
+ "h2",
+ "h3",
+ "h4",
+ "h5",
+ "h6",
+ "hr",
+ "img",
+ "li",
+ "ol",
+ "p",
+ "pre",
+ "strong",
+ "summary",
+ "table",
+ "tbody",
+ "td",
+ "th",
+ "thead",
+ "tr",
+ "ul",
+}
+MARKDOWN_ATTRIBUTES = {
+ "a": ["href", "title"],
+ "abbr": ["title"],
+ "acronym": ["title"],
+ "img": ["alt", "src", "title"],
+ "td": ["align"],
+ "th": ["align"],
+}
+MARKDOWN_LINK_TAGS = {"a"}
+MARKDOWN_LINK_ATTRIBUTES = {"a": ["href", "title"]}
+HIGHLIGHT_LANGUAGE_BY_EXTENSION = {
+ ".c": "language-c",
+ ".cc": "language-cpp",
+ ".cpp": "language-cpp",
+ ".cs": "language-csharp",
+ ".css": "language-css",
+ ".go": "language-go",
+ ".h": "language-c",
+ ".hpp": "language-cpp",
+ ".html": "language-html",
+ ".ini": "language-ini",
+ ".java": "language-java",
+ ".js": "language-javascript",
+ ".json": "language-json",
+ ".jsx": "language-javascript",
+ ".lua": "language-lua",
+ ".md": "language-markdown",
+ ".php": "language-php",
+ ".py": "language-python",
+ ".rb": "language-ruby",
+ ".rs": "language-rust",
+ ".sh": "language-bash",
+ ".sql": "language-sql",
+ ".toml": "language-ini",
+ ".ts": "language-typescript",
+ ".tsx": "language-typescript",
+ ".txt": "language-plaintext",
+ ".xml": "language-xml",
+ ".yaml": "language-yaml",
+ ".yml": "language-yaml",
+}
+HIGHLIGHT_LANGUAGE_BY_NAME = {
+ "dockerfile": "language-dockerfile",
+ "makefile": "language-makefile",
+}
+SECURITY_HEADERS = {
+ "X-Content-Type-Options": "nosniff",
+ "Referrer-Policy": "same-origin",
+ "X-Frame-Options": "DENY",
+}
+CSP_HEADER = (
+ "default-src 'self'; "
+ "base-uri 'self'; "
+ "frame-ancestors 'none'; "
+ "form-action 'self'; "
+ "object-src 'none'; "
+ "img-src 'self' data: http: https:; "
+ "style-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com; "
+ "script-src 'self' 'unsafe-inline' https://cdnjs.cloudflare.com"
+)
+AUTH_FAILURES = {}
+
+POST_RECEIVE_HOOK = """#!/bin/sh
+set_default_head() {
+ for candidate in refs/heads/main refs/heads/master
+ do
+ if git rev-parse --verify -q "$candidate^{commit}" >/dev/null
+ then
+ git symbolic-ref HEAD "$candidate"
+ return 0
+ fi
+ done
+
+ fallback_ref=$(git for-each-ref --sort=-committerdate --format='%(refname)' refs/heads | sed -n '1p')
+ if [ -n "$fallback_ref" ]
+ then
+ git symbolic-ref HEAD "$fallback_ref"
+ fi
+}
+
+current_head=$(git symbolic-ref -q HEAD || true)
+if [ -n "$current_head" ] && git rev-parse --verify -q "$current_head^{commit}" >/dev/null
+then
+ exit 0
+fi
+
+set_default_head
+"""
+
+
+TEMPLATE_PATH.insert(0, str(BASE_DIR / "templates"))
+app = Bottle()
+
+
+class GitCommandError(RuntimeError):
+ def __init__(self, message, returncode=1):
+ super().__init__(message)
+ self.returncode = returncode
+
+
+class GitResponseTooLarge(RuntimeError):
+ pass
+
+
+def validate_startup_config():
+ if not DEBUG and SECRET_KEY == DEFAULT_SECRET_KEY:
+ raise RuntimeError("SECRET_KEY must be set to a non-default value when GITMAN_DEBUG is disabled.")
+
+
+def utcnow():
+ return dt.datetime.now(dt.UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
+
+
+def ensure_dirs():
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ REPO_ROOT.mkdir(parents=True, exist_ok=True)
+
+
+def configure_db_connection(conn):
+ conn.execute("PRAGMA foreign_keys = ON")
+ conn.execute(f"PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}")
+ conn.execute("PRAGMA synchronous = NORMAL")
+
+
+def db_connect():
+ conn = sqlite3.connect(DB_PATH, timeout=SQLITE_BUSY_TIMEOUT_MS / 1000)
+ conn.row_factory = sqlite3.Row
+ configure_db_connection(conn)
+ return conn
+
+
+def init_db():
+ ensure_dirs()
+ with db_connect() as conn:
+ conn.execute("PRAGMA journal_mode = WAL")
+ with db_connect() as conn:
+ conn.executescript(
+ """
+ CREATE TABLE IF NOT EXISTS users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ username TEXT NOT NULL UNIQUE,
+ password_hash TEXT NOT NULL,
+ display_name TEXT NOT NULL DEFAULT '',
+ bio TEXT NOT NULL DEFAULT '',
+ website TEXT NOT NULL DEFAULT '',
+ created_at TEXT NOT NULL
+ );
+
+ CREATE TABLE IF NOT EXISTS repositories (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ owner_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ name TEXT NOT NULL,
+ description TEXT NOT NULL DEFAULT '',
+ forked_from_repo_id INTEGER REFERENCES repositories(id) ON DELETE SET NULL,
+ forked_at TEXT,
+ forked_from_node TEXT NOT NULL DEFAULT '',
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ UNIQUE(owner_id, name)
+ );
+
+ CREATE TABLE IF NOT EXISTS issues (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ repo_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
+ author_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ number INTEGER NOT NULL,
+ title TEXT NOT NULL,
+ body TEXT NOT NULL DEFAULT '',
+ status TEXT NOT NULL DEFAULT 'open',
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ closed_at TEXT,
+ UNIQUE(repo_id, number)
+ );
+
+ CREATE TABLE IF NOT EXISTS issue_comments (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ issue_id INTEGER NOT NULL REFERENCES issues(id) ON DELETE CASCADE,
+ author_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ body TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL
+ );
+
+ CREATE TABLE IF NOT EXISTS repo_contributors (
+ repo_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
+ user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ added_by_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ created_at TEXT NOT NULL,
+ PRIMARY KEY (repo_id, user_id)
+ );
+
+ CREATE TABLE IF NOT EXISTS repo_stars (
+ repo_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
+ user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ created_at TEXT NOT NULL,
+ PRIMARY KEY (repo_id, user_id)
+ );
+
+ CREATE TABLE IF NOT EXISTS pull_requests (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ target_repo_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
+ source_repo_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
+ author_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ number INTEGER NOT NULL,
+ title TEXT NOT NULL,
+ body TEXT NOT NULL DEFAULT '',
+ status TEXT NOT NULL DEFAULT 'open',
+ base_node TEXT NOT NULL,
+ source_node TEXT NOT NULL,
+ target_ref_type TEXT NOT NULL DEFAULT '',
+ target_ref_name TEXT NOT NULL DEFAULT '',
+ source_ref_type TEXT NOT NULL DEFAULT '',
+ source_ref_name TEXT NOT NULL DEFAULT '',
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ closed_at TEXT,
+ merged_at TEXT,
+ merged_by_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
+ merge_node TEXT NOT NULL DEFAULT '',
+ UNIQUE(target_repo_id, number)
+ );
+
+ CREATE TABLE IF NOT EXISTS pull_request_comments (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ pull_request_id INTEGER NOT NULL REFERENCES pull_requests(id) ON DELETE CASCADE,
+ author_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ body TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL
+ );
+
+ CREATE TABLE IF NOT EXISTS commit_comments (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ repo_id INTEGER NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
+ commit_node TEXT NOT NULL,
+ author_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
+ body TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_repositories_owner ON repositories(owner_id);
+ CREATE INDEX IF NOT EXISTS idx_issues_repo_number ON issues(repo_id, number);
+ CREATE INDEX IF NOT EXISTS idx_issues_repo_status ON issues(repo_id, status);
+ CREATE INDEX IF NOT EXISTS idx_issue_comments_issue ON issue_comments(issue_id);
+ CREATE INDEX IF NOT EXISTS idx_repo_contributors_user ON repo_contributors(user_id);
+ CREATE INDEX IF NOT EXISTS idx_repo_stars_user ON repo_stars(user_id);
+ CREATE INDEX IF NOT EXISTS idx_pull_requests_target_status ON pull_requests(target_repo_id, status);
+ CREATE INDEX IF NOT EXISTS idx_pull_requests_source ON pull_requests(source_repo_id);
+ CREATE INDEX IF NOT EXISTS idx_pull_request_comments_pull_request ON pull_request_comments(pull_request_id);
+ CREATE INDEX IF NOT EXISTS idx_commit_comments_commit ON commit_comments(repo_id, commit_node);
+ """
+ )
+ ensure_user_profile_columns(conn)
+ ensure_repository_collaboration_columns(conn)
+ ensure_pull_request_ref_columns(conn)
+
+
+def ensure_user_profile_columns(conn):
+ columns = {row["name"] for row in conn.execute("PRAGMA table_info(users)")}
+ profile_columns = {
+ "display_name": "ALTER TABLE users ADD COLUMN display_name TEXT NOT NULL DEFAULT ''",
+ "bio": "ALTER TABLE users ADD COLUMN bio TEXT NOT NULL DEFAULT ''",
+ "website": "ALTER TABLE users ADD COLUMN website TEXT NOT NULL DEFAULT ''",
+ }
+ for name, ddl in profile_columns.items():
+ if name not in columns:
+ conn.execute(ddl)
+
+
+def ensure_repository_collaboration_columns(conn):
+ columns = {row["name"] for row in conn.execute("PRAGMA table_info(repositories)")}
+ collaboration_columns = {
+ "forked_from_repo_id": (
+ "ALTER TABLE repositories "
+ "ADD COLUMN forked_from_repo_id INTEGER REFERENCES repositories(id) ON DELETE SET NULL"
+ ),
+ "forked_at": "ALTER TABLE repositories ADD COLUMN forked_at TEXT",
+ "forked_from_node": "ALTER TABLE repositories ADD COLUMN forked_from_node TEXT NOT NULL DEFAULT ''",
+ }
+ for name, ddl in collaboration_columns.items():
+ if name not in columns:
+ conn.execute(ddl)
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_repositories_forked_from ON repositories(forked_from_repo_id)")
+
+
+def ensure_pull_request_ref_columns(conn):
+ columns = {row["name"] for row in conn.execute("PRAGMA table_info(pull_requests)")}
+ ref_columns = {
+ "target_ref_type": "ALTER TABLE pull_requests ADD COLUMN target_ref_type TEXT NOT NULL DEFAULT ''",
+ "target_ref_name": "ALTER TABLE pull_requests ADD COLUMN target_ref_name TEXT NOT NULL DEFAULT ''",
+ "source_ref_type": "ALTER TABLE pull_requests ADD COLUMN source_ref_type TEXT NOT NULL DEFAULT ''",
+ "source_ref_name": "ALTER TABLE pull_requests ADD COLUMN source_ref_name TEXT NOT NULL DEFAULT ''",
+ }
+ for name, ddl in ref_columns.items():
+ if name not in columns:
+ conn.execute(ddl)
+
+
+def normalize_slug(value, label):
+ slug = (value or "").strip().lower()
+ if not SLUG_RE.match(slug):
+ raise ValueError(f"{label} must be 2-63 characters: lowercase letters, numbers, dots, dashes, or underscores.")
+ if label.lower().startswith("username") and slug in RESERVED_USERNAMES:
+ raise ValueError("Username is reserved.")
+ if slug in {".", ".."} or slug.endswith(".git"):
+ raise ValueError(f"{label} is reserved.")
+ return slug
+
+
+def clean_repo_path(path):
+ raw = (path or "").strip("/")
+ if not raw:
+ return ""
+ raw_parts = raw.split("/")
+ if any(part in {"", ".", ".."} for part in raw_parts):
+ abort(400, "Invalid repository path.")
+ parts = PurePosixPath(raw).parts
+ if any(part in {"", ".", ".."} for part in parts):
+ abort(400, "Invalid repository path.")
+ return "/".join(parts)
+
+
+def hash_password(password):
+ salt = secrets.token_hex(16)
+ digest = hashlib.pbkdf2_hmac(
+ "sha256",
+ password.encode("utf-8"),
+ salt.encode("ascii"),
+ PASSWORD_ITERATIONS,
+ ).hex()
+ return f"pbkdf2_sha256${PASSWORD_ITERATIONS}${salt}${digest}"
+
+
+def verify_password(password, stored_hash):
+ try:
+ algorithm, iterations, salt, expected = stored_hash.split("$", 3)
+ if algorithm != "pbkdf2_sha256":
+ return False
+ digest = hashlib.pbkdf2_hmac(
+ "sha256",
+ password.encode("utf-8"),
+ salt.encode("ascii"),
+ int(iterations),
+ ).hex()
+ except (ValueError, TypeError):
+ return False
+ return hmac.compare_digest(digest, expected)
+
+
+def get_user_by_id(user_id):
+ try:
+ user_id = int(user_id)
+ except (TypeError, ValueError):
+ return None
+ with db_connect() as conn:
+ return conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
+
+
+def get_user_by_username(username):
+ with db_connect() as conn:
+ return conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone()
+
+
+def current_user():
+ return request.environ.get("gitman.user")
+
+
+def request_is_secure():
+ forwarded = request.get_header("X-Forwarded-Proto", "")
+ if forwarded:
+ return forwarded.split(",", 1)[0].strip().lower() == "https"
+ return request.urlparts.scheme == "https"
+
+
+def is_git_request_path():
+ path = request.environ.get("PATH_INFO", request.path) or ""
+ return path == "/git" or path.startswith("/git/")
+
+
+def csrf_token():
+ token = request.environ.get("gitman.csrf_token")
+ if token:
+ return token
+
+ token = request.get_cookie(CSRF_COOKIE_NAME, secret=SECRET_KEY)
+ if not token:
+ token = secrets.token_urlsafe(32)
+ response.set_cookie(
+ CSRF_COOKIE_NAME,
+ token,
+ secret=SECRET_KEY,
+ httponly=True,
+ secure=request_is_secure(),
+ samesite="Lax",
+ path="/",
+ )
+ request.environ["gitman.csrf_token"] = token
+ return token
+
+
+def csrf_field():
+ return (
+ f'<input type="hidden" name="{CSRF_FORM_FIELD}" '
+ f'value="{html.escape(csrf_token(), quote=True)}">'
+ )
+
+
+def validate_csrf_token():
+ expected = request.get_cookie(CSRF_COOKIE_NAME, secret=SECRET_KEY)
+ submitted = request.forms.get(CSRF_FORM_FIELD, "")
+ if not expected or not submitted or not hmac.compare_digest(expected, submitted):
+ abort(403, "Invalid CSRF token.")
+
+
+def request_content_length():
+ try:
+ return int(request.environ.get("CONTENT_LENGTH") or 0)
+ except ValueError:
+ return 0
+
+
+def auth_rate_key(kind, identifier=""):
+ identifier = (identifier or "").strip().lower()[:100]
+ remote_addr = request.environ.get("REMOTE_ADDR", "")
+ return f"{kind}:{remote_addr}:{identifier}"
+
+
+def rate_limit_blocked(kind, identifier=""):
+ if not RATE_LIMIT_ENABLED:
+ return False
+ now = time.time()
+ prune_auth_failures(now)
+ record = AUTH_FAILURES.get(auth_rate_key(kind, identifier))
+ return bool(record and record.get("blocked_until", 0) > now)
+
+
+def prune_auth_failures(now=None):
+ now = now or time.time()
+ for key, record in list(AUTH_FAILURES.items()):
+ if record.get("reset_at", 0) <= now and record.get("blocked_until", 0) <= now:
+ AUTH_FAILURES.pop(key, None)
+
+
+def record_auth_failure(kind, identifier=""):
+ if not RATE_LIMIT_ENABLED:
+ return
+ now = time.time()
+ prune_auth_failures(now)
+ key = auth_rate_key(kind, identifier)
+ record = AUTH_FAILURES.get(key)
+ if not record or record.get("reset_at", 0) <= now:
+ record = {"count": 0, "reset_at": now + RATE_LIMIT_WINDOW_SECONDS, "blocked_until": 0}
+ record["count"] += 1
+ if record["count"] >= RATE_LIMIT_MAX_FAILURES:
+ record["blocked_until"] = now + RATE_LIMIT_COOLDOWN_SECONDS
+ AUTH_FAILURES[key] = record
+
+
+def clear_auth_failures(kind, identifier=""):
+ AUTH_FAILURES.pop(auth_rate_key(kind, identifier), None)
+
+
+def too_many_requests_response():
+ headers = {"Retry-After": str(RATE_LIMIT_COOLDOWN_SECONDS)}
+ return HTTPResponse(
+ "Too many failed attempts. Try again later.\n",
+ status=429,
+ headers=headers,
+ content_type="text/plain; charset=utf-8",
+ )
+
+
[email protected]("before_request")
+def load_current_user():
+ user_id = request.get_cookie("session", secret=SECRET_KEY)
+ request.environ["gitman.user"] = get_user_by_id(user_id) if user_id else None
+
+
[email protected]("before_request")
+def enforce_browser_post_security():
+ if request.method != "POST" or is_git_request_path():
+ return
+ if MAX_FORM_BYTES and request_content_length() > MAX_FORM_BYTES:
+ abort(413, "Request body too large.")
+ validate_csrf_token()
+
+
[email protected]("after_request")
+def add_security_headers():
+ for key, value in SECURITY_HEADERS.items():
+ response.headers.setdefault(key, value)
+ if not is_git_request_path():
+ response.headers.setdefault("Content-Security-Policy", CSP_HEADER)
+
+
+def render(template_name, **context):
+ context.setdefault("user", current_user())
+ context.setdefault("error", None)
+ context.setdefault("notice", None)
+ context.setdefault("csrf_field", csrf_field)
+ context.setdefault("render_markdown_links", render_markdown_links)
+ context.setdefault("render_repo_description", render_repo_description)
+ context.setdefault("format_ref_label", format_ref_label)
+ context.setdefault("url_with_ref", url_with_ref)
+ context.setdefault("current_url_with_ref", current_url_with_ref)
+ context.setdefault("ref_option_label", ref_option_label)
+ return template(template_name, **context)
+
+
+def login_user(user):
+ response.set_cookie(
+ "session",
+ str(user["id"]),
+ secret=SECRET_KEY,
+ httponly=True,
+ secure=request_is_secure(),
+ samesite="Lax",
+ path="/",
+ )
+
+
+def logout_user():
+ response.delete_cookie("session", path="/")
+
+
+def require_login():
+ user = current_user()
+ if user:
+ return user
+ next_url = request.fullpath if request.query_string else request.path
+ redirect("/login?next=" + quote(next_url, safe="/?=&"))
+
+
+def safe_next_url(value):
+ if value and value.startswith("/") and not value.startswith("//"):
+ return value
+ return "/"
+
+
+def repo_path(owner_username, repo_name):
+ path = REPO_ROOT / owner_username / repo_name
+ root = REPO_ROOT.resolve()
+ resolved = path.resolve()
+ if root != resolved and root not in resolved.parents:
+ abort(400, "Invalid repository path.")
+ return path
+
+
+def get_repo(owner_username, repo_name):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT repositories.*, users.username AS owner_username
+ FROM repositories
+ JOIN users ON users.id = repositories.owner_id
+ WHERE users.username = ? AND repositories.name = ?
+ """,
+ (owner_username, repo_name),
+ ).fetchone()
+
+
+def get_repo_by_id(repo_id):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT repositories.*, users.username AS owner_username
+ FROM repositories
+ JOIN users ON users.id = repositories.owner_id
+ WHERE repositories.id = ?
+ """,
+ (repo_id,),
+ ).fetchone()
+
+
+def list_public_repos(limit=100):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT repositories.*, users.username AS owner_username
+ FROM repositories
+ JOIN users ON users.id = repositories.owner_id
+ ORDER BY repositories.updated_at DESC
+ LIMIT ?
+ """,
+ (limit,),
+ ).fetchall()
+
+
+def text_preview(value, limit=180):
+ text = " ".join((value or "").split())
+ if len(text) <= limit:
+ return text
+ return text[: limit - 3].rstrip() + "..."
+
+
+def parse_activity_time(value):
+ raw = (value or "").strip()
+ if not raw:
+ return dt.datetime.min.replace(tzinfo=dt.UTC)
+ candidates = [raw]
+ if raw.endswith("Z"):
+ candidates.append(raw[:-1] + "+00:00")
+ for candidate in candidates:
+ try:
+ parsed = dt.datetime.fromisoformat(candidate)
+ if parsed.tzinfo is None:
+ parsed = parsed.replace(tzinfo=dt.UTC)
+ return parsed.astimezone(dt.UTC)
+ except ValueError:
+ pass
+ for date_format in ("%Y-%m-%d %H:%M:%S %z", "%Y-%m-%d %H:%M %z"):
+ try:
+ return dt.datetime.strptime(raw, date_format).astimezone(dt.UTC)
+ except ValueError:
+ pass
+ return dt.datetime.min.replace(tzinfo=dt.UTC)
+
+
+def activity_sort_key(action):
+ return (
+ parse_activity_time(action["occurred_at"]),
+ action["kind"],
+ action["target_url"],
+ )
+
+
+def normalize_activity_action(row):
+ action = dict(row)
+ actor_username = action.get("actor_username") or ""
+ action["actor_label"] = f"@{actor_username}" if actor_username else ""
+ action["actor_url"] = f"/{actor_username}" if actor_username else ""
+ action["detail"] = text_preview(action["detail"])
+ return action
+
+
+def list_activity_repositories():
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT repositories.*, users.username AS owner_username
+ FROM repositories
+ JOIN users ON users.id = repositories.owner_id
+ ORDER BY repositories.created_at DESC
+ """
+ ).fetchall()
+
+
+def list_commit_activity_actions(limit):
+ actions = []
+ for repo in list_activity_repositories():
+ path = repo_path(repo["owner_username"], repo["name"])
+ try:
+ commits = commit_log(path, limit=limit)
+ except (GitCommandError, OSError):
+ continue
+ for commit in commits:
+ actions.append(
+ {
+ "kind": "commit_created",
+ "occurred_at": commit["date"],
+ "actor_username": "",
+ "actor_label": commit["author"],
+ "actor_url": "",
+ "repo_owner_username": repo["owner_username"],
+ "repo_name": repo["name"],
+ "target_url": f"/{repo['owner_username']}/{repo['name']}/commits/{commit['node']}",
+ "target_label": commit["short_node"],
+ "summary": "committed",
+ "detail": text_preview(commit["summary"]),
+ }
+ )
+ return actions
+
+
+def list_recent_actions(limit=50):
+ limit = max(1, min(int(limit), 100))
+ with db_connect() as conn:
+ rows = conn.execute(
+ """
+ SELECT *
+ FROM (
+ SELECT
+ CASE
+ WHEN repositories.forked_from_repo_id IS NULL THEN 'repo_created'
+ ELSE 'repo_forked'
+ END AS kind,
+ repositories.created_at AS occurred_at,
+ owner.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name AS target_url,
+ owner.username || '/' || repositories.name AS target_label,
+ CASE
+ WHEN repositories.forked_from_repo_id IS NULL THEN 'created repository'
+ ELSE 'forked repository'
+ END AS summary,
+ repositories.description AS detail
+ FROM repositories
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'repo_starred' AS kind,
+ repo_stars.created_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name AS target_url,
+ owner.username || '/' || repositories.name AS target_label,
+ 'starred repository' AS summary,
+ '' AS detail
+ FROM repo_stars
+ JOIN users AS actor ON actor.id = repo_stars.user_id
+ JOIN repositories ON repositories.id = repo_stars.repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'contributor_added' AS kind,
+ repo_contributors.created_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name AS target_url,
+ owner.username || '/' || repositories.name AS target_label,
+ 'added contributor' AS summary,
+ contributor.username AS detail
+ FROM repo_contributors
+ JOIN users AS actor ON actor.id = repo_contributors.added_by_id
+ JOIN users AS contributor ON contributor.id = repo_contributors.user_id
+ JOIN repositories ON repositories.id = repo_contributors.repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'issue_opened' AS kind,
+ issues.created_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name || '/issues/' || issues.number AS target_url,
+ '#' || issues.number || ' ' || issues.title AS target_label,
+ 'opened issue' AS summary,
+ issues.body AS detail
+ FROM issues
+ JOIN users AS actor ON actor.id = issues.author_id
+ JOIN repositories ON repositories.id = issues.repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'issue_commented' AS kind,
+ issue_comments.created_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name || '/issues/' || issues.number AS target_url,
+ '#' || issues.number || ' ' || issues.title AS target_label,
+ 'commented on issue' AS summary,
+ issue_comments.body AS detail
+ FROM issue_comments
+ JOIN users AS actor ON actor.id = issue_comments.author_id
+ JOIN issues ON issues.id = issue_comments.issue_id
+ JOIN repositories ON repositories.id = issues.repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'pull_request_opened' AS kind,
+ pull_requests.created_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name || '/pulls/' || pull_requests.number AS target_url,
+ '#' || pull_requests.number || ' ' || pull_requests.title AS target_label,
+ 'opened pull request' AS summary,
+ pull_requests.body AS detail
+ FROM pull_requests
+ JOIN users AS actor ON actor.id = pull_requests.author_id
+ JOIN repositories ON repositories.id = pull_requests.target_repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'pull_request_commented' AS kind,
+ pull_request_comments.created_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name || '/pulls/' || pull_requests.number AS target_url,
+ '#' || pull_requests.number || ' ' || pull_requests.title AS target_label,
+ 'commented on pull request' AS summary,
+ pull_request_comments.body AS detail
+ FROM pull_request_comments
+ JOIN users AS actor ON actor.id = pull_request_comments.author_id
+ JOIN pull_requests ON pull_requests.id = pull_request_comments.pull_request_id
+ JOIN repositories ON repositories.id = pull_requests.target_repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'pull_request_merged' AS kind,
+ pull_requests.merged_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name || '/pulls/' || pull_requests.number AS target_url,
+ '#' || pull_requests.number || ' ' || pull_requests.title AS target_label,
+ 'merged pull request' AS summary,
+ pull_requests.merge_node AS detail
+ FROM pull_requests
+ JOIN users AS actor ON actor.id = pull_requests.merged_by_id
+ JOIN repositories ON repositories.id = pull_requests.target_repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+ WHERE pull_requests.merged_at IS NOT NULL
+
+ UNION ALL
+
+ SELECT
+ 'commit_commented' AS kind,
+ commit_comments.created_at AS occurred_at,
+ actor.username AS actor_username,
+ owner.username AS repo_owner_username,
+ repositories.name AS repo_name,
+ '/' || owner.username || '/' || repositories.name || '/commits/' || commit_comments.commit_node AS target_url,
+ substr(commit_comments.commit_node, 1, 12) AS target_label,
+ 'commented on commit' AS summary,
+ commit_comments.body AS detail
+ FROM commit_comments
+ JOIN users AS actor ON actor.id = commit_comments.author_id
+ JOIN repositories ON repositories.id = commit_comments.repo_id
+ JOIN users AS owner ON owner.id = repositories.owner_id
+
+ UNION ALL
+
+ SELECT
+ 'user_joined' AS kind,
+ users.created_at AS occurred_at,
+ users.username AS actor_username,
+ '' AS repo_owner_username,
+ '' AS repo_name,
+ '/' || users.username AS target_url,
+ '@' || users.username AS target_label,
+ 'joined' AS summary,
+ '' AS detail
+ FROM users
+ )
+ WHERE occurred_at IS NOT NULL
+ ORDER BY occurred_at DESC
+ LIMIT ?
+ """,
+ (limit,),
+ ).fetchall()
+
+ actions = [normalize_activity_action(row) for row in rows]
+ actions.extend(list_commit_activity_actions(limit))
+ actions.sort(key=activity_sort_key, reverse=True)
+ return actions[:limit]
+
+
+def list_owned_repos(owner_id):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT repositories.*, users.username AS owner_username, COUNT(repo_stars.user_id) AS star_count
+ FROM repositories
+ JOIN users ON users.id = repositories.owner_id
+ LEFT JOIN repo_stars ON repo_stars.repo_id = repositories.id
+ WHERE repositories.owner_id = ?
+ GROUP BY repositories.id
+ ORDER BY repositories.updated_at DESC
+ """,
+ (owner_id,),
+ ).fetchall()
+
+
+def list_starred_repos(user_id):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT
+ repositories.*,
+ users.username AS owner_username,
+ repo_stars.created_at AS starred_at,
+ (
+ SELECT COUNT(*)
+ FROM repo_stars AS counts
+ WHERE counts.repo_id = repositories.id
+ ) AS star_count
+ FROM repo_stars
+ JOIN repositories ON repositories.id = repo_stars.repo_id
+ JOIN users ON users.id = repositories.owner_id
+ WHERE repo_stars.user_id = ?
+ ORDER BY repo_stars.created_at DESC
+ """,
+ (user_id,),
+ ).fetchall()
+
+
+def list_user_forks_for_target(user_id, target_repo_id):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT repositories.*, users.username AS owner_username
+ FROM repositories
+ JOIN users ON users.id = repositories.owner_id
+ WHERE repositories.owner_id = ?
+ AND repositories.forked_from_repo_id = ?
+ ORDER BY repositories.updated_at DESC
+ """,
+ (user_id, target_repo_id),
+ ).fetchall()
+
+
+def user_has_fork_for_target(user_id, target_repo_id):
+ with db_connect() as conn:
+ row = conn.execute(
+ """
+ SELECT 1
+ FROM repositories
+ WHERE owner_id = ?
+ AND forked_from_repo_id = ?
+ LIMIT 1
+ """,
+ (user_id, target_repo_id),
+ ).fetchone()
+ return bool(row)
+
+
+def list_repo_contributors(repo_id):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT users.*, repo_contributors.created_at AS contributor_since
+ FROM repo_contributors
+ JOIN users ON users.id = repo_contributors.user_id
+ WHERE repo_contributors.repo_id = ?
+ ORDER BY users.username
+ """,
+ (repo_id,),
+ ).fetchall()
+
+
+def repo_contributor_usernames(repo_id):
+ return [row["username"] for row in list_repo_contributors(repo_id)]
+
+
+def repo_star_count(repo_id):
+ with db_connect() as conn:
+ return conn.execute(
+ "SELECT COUNT(*) FROM repo_stars WHERE repo_id = ?",
+ (repo_id,),
+ ).fetchone()[0]
+
+
+def user_starred_repo(user, repo):
+ if not user or not repo:
+ return False
+ with db_connect() as conn:
+ row = conn.execute(
+ "SELECT 1 FROM repo_stars WHERE repo_id = ? AND user_id = ?",
+ (repo["id"], user["id"]),
+ ).fetchone()
+ return bool(row)
+
+
+def star_repo(user, repo):
+ with db_connect() as conn:
+ conn.execute(
+ """
+ INSERT OR IGNORE INTO repo_stars (repo_id, user_id, created_at)
+ VALUES (?, ?, ?)
+ """,
+ (repo["id"], user["id"], utcnow()),
+ )
+
+
+def unstar_repo(user, repo):
+ with db_connect() as conn:
+ conn.execute(
+ "DELETE FROM repo_stars WHERE repo_id = ? AND user_id = ?",
+ (repo["id"], user["id"]),
+ )
+
+
+def normalize_website(value):
+ website = (value or "").strip()
+ if not website:
+ return ""
+ if any(char.isspace() for char in website):
+ raise ValueError("Website must be a valid http(s) URL.")
+ if "://" not in website:
+ website = "https://" + website
+ parsed = urlparse(website)
+ if parsed.scheme not in {"http", "https"} or not parsed.netloc:
+ raise ValueError("Website must be a valid http(s) URL.")
+ return website[:255]
+
+
+def profile_form_values(form, fallback=None):
+ fallback = dict(fallback) if fallback else {}
+ return {
+ "username": fallback.get("username", ""),
+ "display_name": (form.get("display_name", fallback.get("display_name", "")) or "").strip()[:80],
+ "bio": (form.get("bio", fallback.get("bio", "")) or "").strip()[:1000],
+ "website": (form.get("website", fallback.get("website", "")) or "").strip(),
+ "created_at": fallback.get("created_at", ""),
+ }
+
+
+def install_repo_hooks(path):
+ hooks_dir = path / "hooks"
+ hooks_dir.mkdir(exist_ok=True)
+ post_receive = hooks_dir / "post-receive"
+ post_receive.write_text(POST_RECEIVE_HOOK, encoding="utf-8")
+ post_receive.chmod(0o755)
+
+
+def prepare_repo_for_receive(path):
+ install_repo_hooks(path)
+ run_git(["config", "receive.denyDeleteCurrent", "warn"], cwd=path)
+
+
+def write_git_metadata(path, owner_username, repo_name, description):
+ safe_description = " ".join((description or "").splitlines()).strip()
+ description_file = path / "description"
+ if description_file.exists():
+ description_file.write_text(safe_description or f"{owner_username}/{repo_name}", encoding="utf-8")
+ prepare_repo_for_receive(path)
+ run_git(["config", "gitman.owner", owner_username], cwd=path)
+ run_git(["config", "gitman.name", repo_name], cwd=path)
+ run_git(["config", "gitman.description", safe_description], cwd=path)
+ run_git(["config", "http.receivepack", "true"], cwd=path)
+
+
+def sync_repo_git_config(repo):
+ path = repo_path(repo["owner_username"], repo["name"])
+ write_git_metadata(path, repo["owner_username"], repo["name"], repo["description"])
+
+
+def git_env():
+ env = os.environ.copy()
+ env["GIT_TERMINAL_PROMPT"] = "0"
+ path_parts = [part for part in env.get("PATH", "").split(os.pathsep) if part]
+ for part in DEFAULT_EXEC_PATH.split(os.pathsep):
+ if part not in path_parts:
+ path_parts.append(part)
+ env["PATH"] = os.pathsep.join(path_parts)
+ return env
+
+
+def git_executable(env):
+ if os.path.isabs(GIT_BINARY):
+ return GIT_BINARY
+ executable = shutil.which(GIT_BINARY, path=env.get("PATH"))
+ if executable:
+ return executable
+ raise GitCommandError(
+ "Git executable not found. Install git or set GITMAN_GIT_BINARY to the full path, such as /usr/bin/git.",
+ 127,
+ )
+
+
+def run_git(args, cwd=None, timeout=15, check=True, text=True):
+ env = git_env()
+ completed = subprocess.run(
+ [git_executable(env), *args],
+ cwd=cwd,
+ capture_output=True,
+ text=text,
+ encoding="utf-8" if text else None,
+ errors="replace" if text else None,
+ timeout=timeout,
+ env=env,
+ )
+ if check and completed.returncode != 0:
+ stderr = completed.stderr if text else completed.stderr.decode("utf-8", "replace")
+ raise GitCommandError(stderr.strip() or "Git command failed.", completed.returncode)
+ return completed
+
+
+def create_repository(owner, name, description):
+ now = utcnow()
+ path = repo_path(owner["username"], name)
+ if path.exists():
+ raise ValueError("Repository directory already exists.")
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ with db_connect() as conn:
+ try:
+ conn.execute(
+ """
+ INSERT INTO repositories (owner_id, name, description, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?)
+ """,
+ (owner["id"], name, description, now, now),
+ )
+ except sqlite3.IntegrityError as exc:
+ raise ValueError("Repository already exists.") from exc
+
+ try:
+ run_git(["init", "--bare", str(path)], timeout=20)
+ run_git(["symbolic-ref", "HEAD", "refs/heads/main"], cwd=path)
+ write_git_metadata(path, owner["username"], name, description)
+ except Exception:
+ with db_connect() as conn:
+ conn.execute(
+ "DELETE FROM repositories WHERE owner_id = ? AND name = ?",
+ (owner["id"], name),
+ )
+ if path.exists():
+ shutil.rmtree(path)
+ raise
+
+
+def fork_repository(owner, source_repo, name, description):
+ now = utcnow()
+ source_path = repo_path(source_repo["owner_username"], source_repo["name"])
+ path = repo_path(owner["username"], name)
+ if path.exists():
+ raise ValueError("Repository directory already exists.")
+ path.parent.mkdir(parents=True, exist_ok=True)
+ upstream_repo_id = source_repo["forked_from_repo_id"] or source_repo["id"]
+ forked_from_node = (
+ source_repo["forked_from_node"]
+ if source_repo["forked_from_repo_id"] and source_repo["forked_from_node"]
+ else default_code_ref(source_path).get("node") or NULL_REV
+ )
+
+ with db_connect() as conn:
+ try:
+ conn.execute(
+ """
+ INSERT INTO repositories (
+ owner_id, name, description, forked_from_repo_id,
+ forked_at, forked_from_node, created_at, updated_at
+ )
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ owner["id"],
+ name,
+ description,
+ upstream_repo_id,
+ now,
+ forked_from_node,
+ now,
+ now,
+ ),
+ )
+ except sqlite3.IntegrityError as exc:
+ raise ValueError("Repository already exists.") from exc
+
+ try:
+ run_git(["clone", "--bare", str(source_path), str(path)], timeout=60)
+ write_git_metadata(path, owner["username"], name, description)
+ except Exception:
+ with db_connect() as conn:
+ conn.execute(
+ "DELETE FROM repositories WHERE owner_id = ? AND name = ?",
+ (owner["id"], name),
+ )
+ if path.exists():
+ shutil.rmtree(path)
+ raise
+
+
+def delete_repository(repo, path):
+ with db_connect() as conn:
+ conn.execute("DELETE FROM repositories WHERE id = ?", (repo["id"],))
+ if path.exists():
+ shutil.rmtree(path)
+
+
+def add_repo_contributor(repo, added_by, username):
+ username = (username or "").strip().lower()
+ contributor = get_user_by_username(username)
+ if not contributor:
+ raise ValueError("User not found.")
+ if contributor["id"] == repo["owner_id"]:
+ raise ValueError("The owner already has access.")
+ now = utcnow()
+ with db_connect() as conn:
+ try:
+ conn.execute(
+ """
+ INSERT INTO repo_contributors (repo_id, user_id, added_by_id, created_at)
+ VALUES (?, ?, ?, ?)
+ """,
+ (repo["id"], contributor["id"], added_by["id"], now),
+ )
+ except sqlite3.IntegrityError as exc:
+ raise ValueError("User is already a contributor.") from exc
+ sync_repo_git_config(repo)
+
+
+def remove_repo_contributor(repo, user_id):
+ with db_connect() as conn:
+ conn.execute(
+ "DELETE FROM repo_contributors WHERE repo_id = ? AND user_id = ?",
+ (repo["id"], user_id),
+ )
+ sync_repo_git_config(repo)
+
+
+def git_files(path, revision="HEAD"):
+ if not revision or is_null_revision(revision):
+ return []
+ completed = run_git(["ls-tree", "-r", "--name-only", revision], cwd=path, check=False)
+ if completed.returncode != 0:
+ stderr = (completed.stderr or "").lower()
+ stdout = completed.stdout or ""
+ if (
+ not stderr and not stdout
+ ) or "unknown revision" in stderr or "bad revision" in stderr or "not a valid object name" in stderr:
+ return []
+ raise GitCommandError(completed.stderr.strip() or "Unable to list files.", completed.returncode)
+ return [line.strip() for line in completed.stdout.splitlines() if line.strip()]
+
+
+def git_cat(path, file_path, revision="HEAD", text=True):
+ completed = run_git(["show", f"{revision}:{file_path}"], cwd=path, check=True, text=text)
+ return completed.stdout if text else completed.stdout
+
+
+def truncate_bytes_for_render(content):
+ if not MAX_RENDER_BYTES or len(content) <= MAX_RENDER_BYTES:
+ return content, False
+ return content[:MAX_RENDER_BYTES], True
+
+
+def truncate_text_for_render(content, label="Preview"):
+ encoded = (content or "").encode("utf-8", "replace")
+ truncated, was_truncated = truncate_bytes_for_render(encoded)
+ text = truncated.decode("utf-8", "replace")
+ if was_truncated:
+ text = text.rstrip() + f"\n\n[{label} truncated. Use the raw view or local clone for the full content.]\n"
+ return text, was_truncated
+
+
+def read_file_bytes(path, file_path, revision="HEAD"):
+ if not revision or is_null_revision(revision):
+ raise GitCommandError("File not found.")
+ completed = run_git(["show", f"{revision}:{file_path}"], cwd=path, check=False, text=False)
+ if completed.returncode != 0:
+ raise GitCommandError(completed.stderr.decode("utf-8", "replace").strip() or "Unable to read file.")
+ return completed.stdout
+
+
+def build_tree(files, subpath):
+ prefix = f"{subpath}/" if subpath else ""
+ entries = {}
+ for file_path in files:
+ if prefix and not file_path.startswith(prefix):
+ continue
+ rest = file_path[len(prefix) :]
+ if not rest:
+ continue
+ name, _, _remaining = rest.partition("/")
+ full_path = f"{prefix}{name}" if prefix else name
+ entries[name] = {
+ "name": name,
+ "path": full_path,
+ "type": "dir" if "/" in rest else "file",
+ }
+ return sorted(entries.values(), key=lambda item: (item["type"] != "dir", item["name"].lower()))
+
+
+def readme_for_repo(path, files, revision="HEAD"):
+ by_lower = {file_path.lower(): file_path for file_path in files}
+ for candidate in README_CANDIDATES:
+ actual = by_lower.get(candidate.lower())
+ if actual:
+ try:
+ return actual, git_cat(path, actual, revision=revision)
+ except GitCommandError:
+ return actual, ""
+ return None, None
+
+
+def readme_preview_for_repo(path, files, revision="HEAD"):
+ name, readme = readme_for_repo(path, files, revision=revision)
+ if readme is None:
+ return name, readme, False
+ readme, truncated = truncate_text_for_render(readme, label="README preview")
+ return name, readme, truncated
+
+
+def is_markdown_file(file_path):
+ return bool(file_path and file_path.lower().endswith((".md", ".markdown", ".mdown")))
+
+
+def highlight_language_class(file_path):
+ path = Path(file_path)
+ by_name = HIGHLIGHT_LANGUAGE_BY_NAME.get(path.name.lower())
+ if by_name:
+ return by_name
+ return HIGHLIGHT_LANGUAGE_BY_EXTENSION.get(path.suffix.lower(), "")
+
+
+def render_markdown(text):
+ text = SCRIPT_STYLE_RE.sub("", text or "")
+ rendered = markdown.markdown(
+ text,
+ extensions=MARKDOWN_EXTENSIONS,
+ output_format="html5",
+ )
+ return bleach.clean(
+ rendered,
+ tags=MARKDOWN_TAGS,
+ attributes=MARKDOWN_ATTRIBUTES,
+ protocols={"http", "https", "mailto"},
+ strip=True,
+ )
+
+
+def render_markdown_links(text):
+ rendered = markdown.markdown(
+ html.escape(text or "", quote=False),
+ output_format="html5",
+ )
+ return bleach.clean(
+ rendered,
+ tags=MARKDOWN_LINK_TAGS,
+ attributes=MARKDOWN_LINK_ATTRIBUTES,
+ protocols={"http", "https", "mailto"},
+ strip=True,
+ ).strip()
+
+
+def render_repo_description(text):
+ return render_markdown_links(text)
+
+
+def is_null_revision(value):
+ return (value or "").strip().lower() in {NULL_REV, NULL_NODE}
+
+
+def strip_git_record_separator(value):
+ value = (value or "").rstrip("\n")
+ if value.endswith("\x1e"):
+ value = value[:-1]
+ return value
+
+
+def newest_revision_sort_key(item):
+ return (item.get("date", ""), item.get("name", ""))
+
+
+def commit_log(path, limit=50, revision=None):
+ revision = revision_or_default(path, revision)
+ if is_null_revision(revision):
+ return []
+ format_arg = "%H%x1f%h%x1f%an%x1f%ad%x1f%s%x1e"
+ completed = run_git(
+ ["log", "-n", str(limit), "--date=iso-strict", f"--format={format_arg}", revision],
+ cwd=path,
+ check=False,
+ )
+ if completed.returncode != 0:
+ stderr = (completed.stderr or "").lower()
+ if "does not have any commits" in stderr or "bad revision" in stderr:
+ return []
+ raise GitCommandError(completed.stderr.strip() or "Unable to read commit log.", completed.returncode)
+ commits = []
+ for record in completed.stdout.split("\x1e"):
+ if not record:
+ continue
+ parts = record.split("\x1f")
+ if len(parts) != 5:
+ continue
+ commits.append(
+ {
+ "rev": "",
+ "node": parts[0],
+ "short_node": parts[1],
+ "author": parts[2],
+ "date": parts[3],
+ "summary": parts[4],
+ }
+ )
+ return commits
+
+
+def list_repo_tags(path):
+ completed = run_git(["for-each-ref", "--format=%(refname:short)", "refs/tags"], cwd=path, check=False)
+ if completed.returncode != 0:
+ raise GitCommandError(completed.stderr.strip() or "Unable to read repository tags.", completed.returncode)
+
+ tags = []
+ for name in completed.stdout.splitlines():
+ name = name.strip()
+ if not name:
+ continue
+ commit = revision_info(path, f"refs/tags/{name}^{{commit}}")
+ if not commit:
+ continue
+ tags.append(
+ {
+ "type": REF_TYPE_TAG,
+ "name": name,
+ "label": f"tag {name}",
+ "rev": "",
+ "node": commit["node"],
+ "short_node": commit["short_node"],
+ "branch": "",
+ "active": False,
+ "closed": False,
+ "local": False,
+ "is_default": False,
+ "date": commit["date"],
+ "summary": commit["summary"],
+ }
+ )
+ tags.sort(key=newest_revision_sort_key, reverse=True)
+ return tags
+
+
+def revision_info(path, revision):
+ if not revision:
+ return None
+ format_arg = "%H%x1f%h%x1f%ad%x1f%s%x1e"
+ completed = run_git(
+ ["show", "-s", "--date=iso-strict", f"--format={format_arg}", revision],
+ cwd=path,
+ check=False,
+ )
+ if completed.returncode != 0 or not completed.stdout:
+ return None
+ parts = strip_git_record_separator(completed.stdout).split("\x1f")
+ if len(parts) != 4:
+ return None
+ if is_null_revision(parts[0]):
+ return None
+ return {
+ "rev": "",
+ "node": parts[0],
+ "short_node": parts[1],
+ "branch": "",
+ "date": parts[2],
+ "summary": parts[3],
+ }
+
+
+def empty_tip_ref(is_default=False):
+ return {
+ "type": REF_TYPE_TIP,
+ "name": "",
+ "label": "HEAD",
+ "rev": "",
+ "node": None,
+ "short_node": "",
+ "branch": "",
+ "date": "",
+ "summary": "",
+ "active": False,
+ "closed": False,
+ "is_default": is_default,
+ }
+
+
+def tip_ref(path, is_default=False):
+ info = revision_info(path, "HEAD")
+ if not info:
+ return empty_tip_ref(is_default=is_default)
+ info.update(
+ {
+ "type": REF_TYPE_TIP,
+ "name": "",
+ "label": "HEAD",
+ "active": False,
+ "closed": False,
+ "is_default": is_default,
+ }
+ )
+ return info
+
+
+def commit_ref(path, revision):
+ revision = (revision or "").strip()
+ if not REV_RE.match(revision) or revision == NULL_REV:
+ raise ValueError("Commit not found.")
+ info = revision_info(path, revision)
+ if not info:
+ raise ValueError("Commit not found.")
+ info.update(
+ {
+ "type": REF_TYPE_COMMIT,
+ "name": info["node"],
+ "label": f"commit {info['short_node']}",
+ "active": False,
+ "closed": False,
+ "is_default": False,
+ }
+ )
+ return info
+
+
+def list_repo_branches(path):
+ format_arg = "%(refname:short)%00%(objectname)%00%(objectname:short)%00%(committerdate:iso-strict)%00%(subject)"
+ completed = run_git(
+ ["for-each-ref", f"--format={format_arg}", "refs/heads"],
+ cwd=path,
+ check=False,
+ )
+ if completed.returncode != 0:
+ raise GitCommandError(completed.stderr.strip() or "Unable to read repository branches.", completed.returncode)
+
+ head_branch = repo_head_branch(path)
+ branches = []
+ for record in completed.stdout.splitlines():
+ if not record:
+ continue
+ parts = record.split("\x00")
+ if len(parts) != 5:
+ continue
+ branches.append(
+ {
+ "type": REF_TYPE_BRANCH,
+ "name": parts[0],
+ "label": f"branch {parts[0]}",
+ "node": parts[1],
+ "short_node": parts[2],
+ "rev": "",
+ "active": parts[0] == head_branch,
+ "closed": False,
+ "date": parts[3],
+ "summary": parts[4],
+ "is_default": False,
+ }
+ )
+ branches.sort(key=newest_revision_sort_key, reverse=True)
+ return branches
+
+
+def choose_default_branch(branches, head_branch=""):
+ for branch in branches:
+ if branch["name"] == head_branch:
+ return branch
+ for branch_name in DEFAULT_BRANCH_CANDIDATES:
+ for branch in branches:
+ if branch["name"] == branch_name:
+ return branch
+ return branches[0] if branches else None
+
+
+def default_code_ref(path):
+ head_branch = repo_head_branch(path)
+ selected_branch = choose_default_branch(list_repo_branches(path), head_branch)
+ if selected_branch:
+ selected = dict(selected_branch)
+ selected["active"] = True
+ selected["is_default"] = True
+ if selected["name"] != head_branch:
+ set_repo_head_branch(path, selected["name"])
+ return selected
+ return tip_ref(path, is_default=True)
+
+
+def revision_or_default(path, revision):
+ if revision is None:
+ revision = default_code_ref(path).get("node")
+ return revision or NULL_REV
+
+
+def resolve_repo_ref(path, ref_type, ref_name=""):
+ ref_type = (ref_type or "").strip().lower()
+ ref_name = ref_name or ""
+ if ref_type == REF_TYPE_TIP:
+ return tip_ref(path)
+ if ref_type == REF_TYPE_COMMIT:
+ return commit_ref(path, ref_name)
+ if ref_type == REF_TYPE_BRANCH:
+ for branch in list_repo_branches(path):
+ if branch["name"] == ref_name:
+ return dict(branch)
+ raise ValueError("Branch not found.")
+ if ref_type == REF_TYPE_TAG:
+ for tag in list_repo_tags(path):
+ if tag["name"] == ref_name:
+ return dict(tag)
+ raise ValueError("Tag not found.")
+ raise ValueError("Ref not found.")
+
+
+def ref_option_value(ref_type, ref_name=""):
+ return REF_VALUE_SEPARATOR.join((ref_type, quote(ref_name or "", safe="")))
+
+
+def source_ref_option_value(repo_id, ref_type, ref_name=""):
+ return REF_VALUE_SEPARATOR.join((str(repo_id), ref_type, quote(ref_name or "", safe="")))
+
+
+def parse_ref_option_value(value, allowed_types=REF_TYPES):
+ parts = (value or "").split(REF_VALUE_SEPARATOR, 1)
+ if len(parts) != 2 or parts[0] not in allowed_types:
+ raise ValueError("Invalid ref.")
+ return parts[0], unquote(parts[1])
+
+
+def parse_source_ref_option_value(value):
+ parts = (value or "").split(REF_VALUE_SEPARATOR, 2)
+ if len(parts) != 3 or parts[1] not in PULL_REQUEST_REF_TYPES:
+ raise ValueError("Invalid source ref.")
+ try:
+ repo_id = int(parts[0])
+ except ValueError as exc:
+ raise ValueError("Invalid source repository.") from exc
+ return repo_id, parts[1], unquote(parts[2])
+
+
+def selected_repo_ref(path):
+ ref_value = request.query.get("ref_value")
+ if ref_value:
+ try:
+ ref_type, ref_name = parse_ref_option_value(ref_value)
+ return resolve_repo_ref(path, ref_type, ref_name)
+ except ValueError as exc:
+ abort(404, str(exc))
+
+ ref_type = (request.query.get("ref_type") or "").strip().lower()
+ if not ref_type:
+ return default_code_ref(path)
+ if ref_type not in REF_TYPES:
+ abort(404, "Ref not found.")
+ try:
+ return resolve_repo_ref(path, ref_type, request.query.get("ref") or "")
+ except ValueError as exc:
+ abort(404, str(exc))
+
+
+def ref_revision(ref_info):
+ if not ref_info:
+ return None
+ return ref_info.get("node") or NULL_REV
+
+
+def ref_query_string(ref_info, force=False):
+ if not ref_info or (ref_info.get("is_default") and not force):
+ return ""
+ ref_type = ref_info.get("type") or REF_TYPE_TIP
+ params = {"ref_type": ref_type}
+ if ref_type != REF_TYPE_TIP:
+ params["ref"] = ref_info.get("name", "")
+ return urlencode(params)
+
+
+def url_with_ref(url, ref_info=None, force=False):
+ query = ref_query_string(ref_info, force=force)
+ if not query:
+ return url
+ separator = "&" if "?" in url else "?"
+ return f"{url}{separator}{query}"
+
+
+def current_url_with_ref(ref_info=None, force=False):
+ params = [
+ (key, value)
+ for key, value in request.query.allitems()
+ if key not in REF_QUERY_KEYS
+ ]
+ query = ref_query_string(ref_info, force=force)
+ if query:
+ params.extend(parse_qsl(query, keep_blank_values=True))
+ encoded = urlencode(params)
+ return request.path + (f"?{encoded}" if encoded else "")
+
+
+def format_ref_label(ref_type, ref_name=""):
+ ref_type = (ref_type or REF_TYPE_TIP).strip().lower()
+ if ref_type == REF_TYPE_BRANCH:
+ return f"branch {ref_name}"
+ if ref_type == REF_TYPE_TAG:
+ return f"tag {ref_name}"
+ if ref_type == REF_TYPE_COMMIT:
+ return f"commit {ref_name[:12]}" if ref_name else "commit"
+ return "HEAD"
+
+
+def ref_option_label(ref):
+ label = format_ref_label(ref["type"], ref.get("name", ""))
+ return label
+
+
+def ref_option_from_ref(ref, repo_id=None):
+ if repo_id is None:
+ value = ref_option_value(ref["type"], ref.get("name", ""))
+ else:
+ value = source_ref_option_value(repo_id, ref["type"], ref.get("name", ""))
+ return {
+ "value": value,
+ "label": ref_option_label(ref),
+ "ref": ref,
+ }
+
+
+def repo_ref_options(path, include_closed_branches=True, include_tip=True, include_tags=True):
+ branches = list_repo_branches(path)
+ refs = []
+ for branch in branches:
+ if include_closed_branches or not branch["closed"]:
+ refs.append(branch)
+ if include_tags:
+ refs.extend(list_repo_tags(path))
+
+ refs.sort(key=newest_revision_sort_key, reverse=True)
+ options = []
+ for index, ref in enumerate(refs):
+ option = ref_option_from_ref(ref)
+ option["is_initial"] = index < 10
+ options.append(option)
+ if include_tip:
+ tip_option = ref_option_from_ref(tip_ref(path))
+ tip_option["is_initial"] = False
+ options.append(tip_option)
+ return options
+
+
+def source_repo_ref_options(source_repo, include_tip=True):
+ path = repo_path(source_repo["owner_username"], source_repo["name"])
+ options = repo_ref_options(path, include_closed_branches=True, include_tip=include_tip, include_tags=False)
+ for option in options:
+ option["value"] = source_ref_option_value(
+ source_repo["id"],
+ option["ref"]["type"],
+ option["ref"].get("name", ""),
+ )
+ option["label"] = f"{source_repo['owner_username']}/{source_repo['name']} {option['label']}"
+ return options
+
+
+def target_repo_ref_options(path):
+ return repo_ref_options(path, include_closed_branches=False, include_tip=False, include_tags=False)
+
+
+def commit_count(path, revision=None):
+ revision = revision_or_default(path, revision)
+ if is_null_revision(revision):
+ return 0
+ completed = run_git(["rev-list", "--count", revision], cwd=path, check=False)
+ if completed.returncode != 0:
+ stderr = (completed.stderr or "").lower()
+ if "does not have any commits" in stderr or "bad revision" in stderr:
+ return 0
+ raise GitCommandError(completed.stderr.strip() or "Unable to count commits.", completed.returncode)
+ try:
+ return int(completed.stdout.strip())
+ except ValueError:
+ return 0
+
+
+def repo_head_ref(path):
+ completed = run_git(["symbolic-ref", "--quiet", "HEAD"], cwd=path, check=False)
+ if completed.returncode == 0:
+ return completed.stdout.strip()
+ return ""
+
+
+def set_repo_head_branch(path, branch_name):
+ if branch_name:
+ run_git(["symbolic-ref", "HEAD", f"refs/heads/{branch_name}"], cwd=path, check=False)
+
+
+def validate_revision_id(value, allow_null=True):
+ revision = (value or "").strip()
+ if not REV_RE.match(revision) or (is_null_revision(revision) and not allow_null):
+ abort(404, "Revision not found.")
+ return revision
+
+
+def repo_head_branch(path):
+ head_ref = repo_head_ref(path)
+ if head_ref.startswith("refs/heads/"):
+ return head_ref.removeprefix("refs/heads/") or "main"
+ return "main"
+
+
+def repo_tip_node(path):
+ completed = run_git(["rev-parse", "--verify", "HEAD^{commit}"], cwd=path, check=False)
+ if completed.returncode != 0:
+ stderr = (completed.stderr or "").lower()
+ if "needed a single revision" in stderr or "unknown revision" in stderr or "ambiguous argument" in stderr:
+ return None
+ raise GitCommandError(completed.stderr.strip() or "Unable to read repository HEAD.", completed.returncode)
+ node = completed.stdout.strip()
+ if is_null_revision(node):
+ return None
+ return node or None
+
+
+def repo_has_revision(path, revision):
+ revision = validate_revision_id(revision)
+ if is_null_revision(revision):
+ return True
+ completed = run_git(["cat-file", "-e", f"{revision}^{{commit}}"], cwd=path, check=False)
+ return completed.returncode == 0
+
+
+def is_ancestor(path, ancestor_node, descendant_node):
+ ancestor_node = validate_revision_id(ancestor_node)
+ descendant_node = validate_revision_id(descendant_node, allow_null=False)
+ if is_null_revision(ancestor_node) or ancestor_node == descendant_node:
+ return True
+ completed = run_git(["merge-base", "--is-ancestor", ancestor_node, descendant_node], cwd=path, check=False)
+ return completed.returncode == 0
+
+
+def commit_detail(path, node):
+ node = validate_revision_id(node, allow_null=False)
+ format_arg = "%H%x1f%h%x1f%an%x1f%ad%x1f%B%x1f%P%x1e"
+ completed = run_git(
+ ["show", "-s", "--date=iso-strict", f"--format={format_arg}", node],
+ cwd=path,
+ check=False,
+ )
+ if completed.returncode != 0 or not completed.stdout:
+ abort(404, "Commit not found.")
+ parts = strip_git_record_separator(completed.stdout).split("\x1f")
+ if len(parts) != 6:
+ abort(404, "Commit not found.")
+ return {
+ "rev": "",
+ "node": parts[0],
+ "short_node": parts[1],
+ "author": parts[2],
+ "date": parts[3],
+ "description": parts[4].strip(),
+ "parents": parts[5],
+ }
+
+
+def commit_diff(path, node):
+ node = validate_revision_id(node, allow_null=False)
+ completed = run_git(["show", "--format=", "--patch", "--find-renames", "--root", node], cwd=path, check=False)
+ if completed.returncode != 0:
+ raise GitCommandError(completed.stderr.strip() or "Unable to read commit diff.", completed.returncode)
+ return truncate_text_for_render(completed.stdout, label="Diff")[0]
+
+
+def diff_between_revisions(path, base_node, source_node):
+ base_node = validate_revision_id(base_node)
+ source_node = validate_revision_id(source_node, allow_null=False)
+ if is_null_revision(base_node):
+ args = ["diff", "--patch", "--find-renames", EMPTY_TREE_NODE, source_node]
+ else:
+ args = ["diff", "--patch", "--find-renames", base_node, source_node]
+ completed = run_git(args, cwd=path, check=False)
+ if completed.returncode != 0:
+ raise GitCommandError(completed.stderr.strip() or "Unable to read diff.", completed.returncode)
+ return truncate_text_for_render(completed.stdout, label="Diff")[0]
+
+
+def ensure_clean_working_copy(path):
+ completed = run_git(["status", "--porcelain"], cwd=path, check=False)
+ if completed.returncode != 0:
+ raise GitCommandError(completed.stderr.strip() or "Unable to inspect working copy.", completed.returncode)
+ if completed.stdout.strip():
+ raise GitCommandError("Target repository has uncommitted working copy changes.")
+
+
+def issue_counts(repo_id):
+ with db_connect() as conn:
+ rows = conn.execute(
+ "SELECT status, COUNT(*) AS count FROM issues WHERE repo_id = ? GROUP BY status",
+ (repo_id,),
+ ).fetchall()
+ counts = {"open": 0, "closed": 0}
+ for row in rows:
+ counts[row["status"]] = row["count"]
+ return counts
+
+
+def pull_request_counts(repo_id):
+ with db_connect() as conn:
+ rows = conn.execute(
+ "SELECT status, COUNT(*) AS count FROM pull_requests WHERE target_repo_id = ? GROUP BY status",
+ (repo_id,),
+ ).fetchall()
+ counts = {"open": 0, "closed": 0, "merged": 0}
+ for row in rows:
+ counts[row["status"]] = row["count"]
+ return counts
+
+
+def list_issues(repo_id, status="open"):
+ if status not in {"open", "closed", "all"}:
+ status = "open"
+ where = "WHERE issues.repo_id = ?"
+ params = [repo_id]
+ if status != "all":
+ where += " AND issues.status = ?"
+ params.append(status)
+ with db_connect() as conn:
+ return conn.execute(
+ f"""
+ SELECT issues.*, users.username AS author_username
+ FROM issues
+ JOIN users ON users.id = issues.author_id
+ {where}
+ ORDER BY issues.updated_at DESC, issues.number DESC
+ """,
+ params,
+ ).fetchall()
+
+
+def get_issue(repo_id, number):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT issues.*, users.username AS author_username
+ FROM issues
+ JOIN users ON users.id = issues.author_id
+ WHERE issues.repo_id = ? AND issues.number = ?
+ """,
+ (repo_id, number),
+ ).fetchone()
+
+
+def list_issue_comments(issue_id):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT issue_comments.*, users.username AS author_username
+ FROM issue_comments
+ JOIN users ON users.id = issue_comments.author_id
+ WHERE issue_comments.issue_id = ?
+ ORDER BY issue_comments.created_at ASC, issue_comments.id ASC
+ """,
+ (issue_id,),
+ ).fetchall()
+
+
+def list_pull_request_comments(pull_request_id):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT pull_request_comments.*, users.username AS author_username
+ FROM pull_request_comments
+ JOIN users ON users.id = pull_request_comments.author_id
+ WHERE pull_request_comments.pull_request_id = ?
+ ORDER BY pull_request_comments.created_at ASC, pull_request_comments.id ASC
+ """,
+ (pull_request_id,),
+ ).fetchall()
+
+
+def list_commit_comments(repo_id, commit_node):
+ with db_connect() as conn:
+ return conn.execute(
+ """
+ SELECT commit_comments.*, users.username AS author_username
+ FROM commit_comments
+ JOIN users ON users.id = commit_comments.author_id
+ WHERE commit_comments.repo_id = ? AND commit_comments.commit_node = ?
+ ORDER BY commit_comments.created_at ASC, commit_comments.id ASC
+ """,
+ (repo_id, commit_node),
+ ).fetchall()
+
+
+def pull_request_select_sql(where_clause):
+ return f"""
+ SELECT
+ pull_requests.*,
+ author.username AS author_username,
+ source_repo.name AS source_repo_name,
+ source_owner.username AS source_owner_username,
+ target_repo.name AS target_repo_name,
+ target_owner.username AS target_owner_username,
+ merged_by.username AS merged_by_username
+ FROM pull_requests
+ JOIN users AS author ON author.id = pull_requests.author_id
+ JOIN repositories AS source_repo ON source_repo.id = pull_requests.source_repo_id
+ JOIN users AS source_owner ON source_owner.id = source_repo.owner_id
+ JOIN repositories AS target_repo ON target_repo.id = pull_requests.target_repo_id
+ JOIN users AS target_owner ON target_owner.id = target_repo.owner_id
+ LEFT JOIN users AS merged_by ON merged_by.id = pull_requests.merged_by_id
+ {where_clause}
+ """
+
+
+def list_pull_requests(repo_id, status="open"):
+ if status not in {"open", "closed", "merged", "all"}:
+ status = "open"
+ where = "WHERE pull_requests.target_repo_id = ?"
+ params = [repo_id]
+ if status != "all":
+ where += " AND pull_requests.status = ?"
+ params.append(status)
+ with db_connect() as conn:
+ return conn.execute(
+ pull_request_select_sql(where) + " ORDER BY pull_requests.updated_at DESC, pull_requests.number DESC",
+ params,
+ ).fetchall()
+
+
+def get_pull_request(repo_id, number):
+ with db_connect() as conn:
+ return conn.execute(
+ pull_request_select_sql(
+ "WHERE pull_requests.target_repo_id = ? AND pull_requests.number = ?"
+ ),
+ (repo_id, number),
+ ).fetchone()
+
+
+def stored_ref_name(ref):
+ return "" if ref["type"] == REF_TYPE_TIP else ref.get("name", "")
+
+
+def pr_ref_type(pr, prefix):
+ return pr[f"{prefix}_ref_type"] or REF_TYPE_TIP
+
+
+def pr_ref_name(pr, prefix):
+ return pr[f"{prefix}_ref_name"] or ""
+
+
+def resolve_pr_ref(path, pr, prefix):
+ return resolve_repo_ref(path, pr_ref_type(pr, prefix), pr_ref_name(pr, prefix))
+
+
+def pull_request_base_node(target_repo, source_repo, target_ref):
+ source_path = repo_path(source_repo["owner_username"], source_repo["name"])
+ target_node = target_ref.get("node") or NULL_REV
+ if repo_has_revision(source_path, target_node):
+ return target_node
+ fork_base = source_repo["forked_from_node"] or NULL_REV
+ if repo_has_revision(source_path, fork_base):
+ return fork_base
+ return NULL_REV
+
+
+def create_pull_request(
+ target_repo,
+ source_repo,
+ author,
+ title,
+ body,
+ source_ref_type,
+ source_ref_name,
+ target_ref_type,
+ target_ref_name,
+):
+ if not source_repo:
+ raise ValueError("Choose a source repository.")
+ source_is_target = source_repo["id"] == target_repo["id"]
+ if not source_is_target and (
+ source_repo["owner_id"] != author["id"] or source_repo["forked_from_repo_id"] != target_repo["id"]
+ ):
+ raise ValueError("Choose one of your forks of this repository.")
+ target_path = repo_path(target_repo["owner_username"], target_repo["name"])
+ source_path = repo_path(source_repo["owner_username"], source_repo["name"])
+ source_ref = resolve_repo_ref(source_path, source_ref_type, source_ref_name)
+ target_ref = resolve_repo_ref(target_path, target_ref_type, target_ref_name)
+ if source_ref["type"] not in PULL_REQUEST_REF_TYPES:
+ raise ValueError("Choose a branch or HEAD as the source ref.")
+ if source_is_target and source_ref["type"] != REF_TYPE_BRANCH:
+ raise ValueError("Choose a branch from this repository.")
+ if target_ref["type"] not in TARGET_PULL_REQUEST_REF_TYPES:
+ raise ValueError("Choose a target branch.")
+ if (
+ source_is_target
+ and source_ref["type"] == REF_TYPE_BRANCH
+ and target_ref["type"] == REF_TYPE_BRANCH
+ and source_ref["name"] == target_ref["name"]
+ ):
+ raise ValueError("Choose different source and target branches.")
+ source_node = source_ref.get("node")
+ if not source_node:
+ raise ValueError("Source repository has no commits.")
+ base_node = pull_request_base_node(target_repo, source_repo, target_ref)
+ now = utcnow()
+ with db_connect() as conn:
+ number = conn.execute(
+ "SELECT COALESCE(MAX(number), 0) + 1 FROM pull_requests WHERE target_repo_id = ?",
+ (target_repo["id"],),
+ ).fetchone()[0]
+ conn.execute(
+ """
+ INSERT INTO pull_requests (
+ target_repo_id, source_repo_id, author_id, number, title, body,
+ base_node, source_node, target_ref_type, target_ref_name,
+ source_ref_type, source_ref_name, created_at, updated_at
+ )
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ target_repo["id"],
+ source_repo["id"],
+ author["id"],
+ number,
+ title[:200],
+ body[:5000],
+ base_node,
+ source_node,
+ target_ref["type"],
+ stored_ref_name(target_ref),
+ source_ref["type"],
+ stored_ref_name(source_ref),
+ now,
+ now,
+ ),
+ )
+ return number
+
+
+def pull_request_diff(pr):
+ source_repo = get_repo_by_id(pr["source_repo_id"])
+ if not source_repo:
+ raise GitCommandError("Source repository no longer exists.")
+ source_path = repo_path(source_repo["owner_username"], source_repo["name"])
+ try:
+ source_ref = resolve_pr_ref(source_path, pr, "source")
+ except ValueError as exc:
+ raise GitCommandError(str(exc)) from exc
+ source_node = source_ref.get("node")
+ if not source_node:
+ raise GitCommandError("Source repository has no commits.")
+ base_node = pr["base_node"] or NULL_REV
+ if not repo_has_revision(source_path, base_node):
+ raise GitCommandError("The pull request base revision is not present in the source repository.")
+ return diff_between_revisions(source_path, base_node, source_node), source_node, source_ref
+
+
+def source_fetch_ref(source_ref):
+ if source_ref["type"] == REF_TYPE_BRANCH:
+ return f"refs/heads/{source_ref['name']}"
+ return "HEAD"
+
+
+def merge_pull_request(pr, user):
+ if pr["status"] != "open":
+ raise ValueError("Only open pull requests can be merged.")
+ target_repo = get_repo_by_id(pr["target_repo_id"])
+ source_repo = get_repo_by_id(pr["source_repo_id"])
+ if not target_repo or not source_repo:
+ raise ValueError("Pull request repositories are no longer available.")
+
+ target_path = repo_path(target_repo["owner_username"], target_repo["name"])
+ source_path = repo_path(source_repo["owner_username"], source_repo["name"])
+ try:
+ source_ref = resolve_pr_ref(source_path, pr, "source")
+ target_ref = resolve_pr_ref(target_path, pr, "target")
+ except ValueError as exc:
+ raise ValueError(str(exc)) from exc
+ if target_ref["type"] != REF_TYPE_BRANCH:
+ raise ValueError("The target ref must be a branch.")
+ source_node = source_ref.get("node")
+ if not source_node:
+ raise ValueError("Source repository has no commits.")
+
+ target_node_before = target_ref.get("node") or NULL_REV
+ with tempfile.TemporaryDirectory(prefix="gitman-merge-") as tempdir:
+ work_path = Path(tempdir) / "work"
+ clone = run_git(["clone", str(target_path), str(work_path)], check=False, timeout=60)
+ if clone.returncode != 0:
+ raise GitCommandError(clone.stderr.strip() or "Unable to clone target repository.", clone.returncode)
+
+ run_git(["config", "user.name", user["username"]], cwd=work_path)
+ run_git(["config", "user.email", f"{user['username']}@gitman.local"], cwd=work_path)
+ checkout = run_git(
+ ["checkout", "-B", target_ref["name"], target_node_before],
+ cwd=work_path,
+ check=False,
+ timeout=30,
+ )
+ if checkout.returncode != 0:
+ raise GitCommandError(checkout.stderr.strip() or "Unable to check out target branch.", checkout.returncode)
+
+ fetched = run_git(
+ ["fetch", str(source_path), source_fetch_ref(source_ref)],
+ cwd=work_path,
+ check=False,
+ timeout=60,
+ )
+ if fetched.returncode != 0:
+ raise GitCommandError(fetched.stderr.strip() or "Unable to fetch source changes.", fetched.returncode)
+ fetched_node = run_git(["rev-parse", "--verify", "FETCH_HEAD"], cwd=work_path).stdout.strip()
+ if fetched_node != source_node:
+ source_node = fetched_node
+
+ if not repo_has_revision(work_path, source_node):
+ raise GitCommandError("Source revision was not fetched into the target repository.")
+
+ if target_node_before != NULL_REV and is_ancestor(work_path, source_node, target_node_before):
+ merge_node = target_node_before
+ elif target_node_before == NULL_REV or is_ancestor(work_path, target_node_before, source_node):
+ merge = run_git(["merge", "--ff-only", "FETCH_HEAD"], cwd=work_path, check=False, timeout=60)
+ if merge.returncode != 0:
+ raise GitCommandError(merge.stderr.strip() or "Unable to fast-forward target branch.", merge.returncode)
+ merge_node = source_node
+ else:
+ commit_message = (
+ f"Merge pull request #{pr['number']} from "
+ f"{source_repo['owner_username']}/{source_repo['name']}\n\n{pr['title']}"
+ )
+ merge = run_git(
+ ["merge", "--no-ff", "FETCH_HEAD", "-m", commit_message],
+ cwd=work_path,
+ check=False,
+ timeout=60,
+ )
+ if merge.returncode != 0:
+ run_git(["merge", "--abort"], cwd=work_path, check=False)
+ message = merge.stderr.strip() or merge.stdout.strip() or "Merge has conflicts."
+ raise GitCommandError(message, merge.returncode)
+ merge_node = repo_tip_node(work_path) or source_node
+
+ push = run_git(
+ ["push", "origin", f"{merge_node}:refs/heads/{target_ref['name']}"],
+ cwd=work_path,
+ check=False,
+ timeout=60,
+ )
+ if push.returncode != 0:
+ raise GitCommandError(push.stderr.strip() or "Unable to update target branch.", push.returncode)
+
+ now = utcnow()
+ with db_connect() as conn:
+ conn.execute(
+ """
+ UPDATE pull_requests
+ SET status = 'merged',
+ source_node = ?,
+ updated_at = ?,
+ closed_at = ?,
+ merged_at = ?,
+ merged_by_id = ?,
+ merge_node = ?
+ WHERE id = ?
+ """,
+ (source_node, now, now, now, user["id"], merge_node, pr["id"]),
+ )
+ conn.execute(
+ "UPDATE repositories SET updated_at = ? WHERE id = ?",
+ (now, target_repo["id"]),
+ )
+ return merge_node
+
+
+def render_pull_request_detail(repo, path, pr, error=None, notice=None, comment_value=""):
+ diff = ""
+ diff_error = None
+ current_source_node = pr["source_node"]
+ current_source_ref = None
+ try:
+ diff, current_source_node, current_source_ref = pull_request_diff(pr)
+ except GitCommandError as exc:
+ diff_error = str(exc)
+ return render(
+ "pull_request_detail.tpl",
+ repo=repo,
+ pr=pr,
+ comments=list_pull_request_comments(pr["id"]),
+ comment_value=comment_value,
+ diff=diff,
+ diff_error=diff_error,
+ current_source_node=current_source_node,
+ current_source_ref=current_source_ref,
+ error=error,
+ notice=notice,
+ **repo_page_context(repo, path),
+ )
+
+
+def render_commit_detail(repo, path, commit, error=None, notice=None, comment_value=""):
+ return render(
+ "commit_detail.tpl",
+ repo=repo,
+ commit=commit,
+ commit_source_ref=commit_ref(path, commit["node"]),
+ diff=commit_diff(path, commit["node"]),
+ comments=list_commit_comments(repo["id"], commit["node"]),
+ comment_value=comment_value,
+ error=error,
+ notice=notice,
+ **repo_page_context(repo, path),
+ )
+
+
+def user_owns_repo(user, repo):
+ return bool(user and repo and user["id"] == repo["owner_id"])
+
+
+def user_contributes_to_repo(user, repo):
+ if not user or not repo:
+ return False
+ with db_connect() as conn:
+ row = conn.execute(
+ "SELECT 1 FROM repo_contributors WHERE repo_id = ? AND user_id = ?",
+ (repo["id"], user["id"]),
+ ).fetchone()
+ return bool(row)
+
+
+def user_can_maintain_repo(user, repo):
+ return user_owns_repo(user, repo) or user_contributes_to_repo(user, repo)
+
+
+def repo_page_context(repo, path=None, selected_ref=None):
+ if path is None:
+ path = repo_path(repo["owner_username"], repo["name"])
+ user = current_user()
+ active_tab = repo_active_tab(repo)
+ show_ref_picker = active_tab in REF_PICKER_TABS
+ if selected_ref is None:
+ selected_ref = selected_repo_ref(path) if show_ref_picker else default_code_ref(path)
+ selected_revision = ref_revision(selected_ref)
+ fork_target_id = repo["forked_from_repo_id"] or repo["id"]
+ source_repo = get_repo_by_id(repo["forked_from_repo_id"]) if repo["forked_from_repo_id"] else None
+ return {
+ "commit_count": commit_count(path, selected_revision),
+ "issue_counts": issue_counts(repo["id"]),
+ "pr_counts": pull_request_counts(repo["id"]),
+ "star_count": repo_star_count(repo["id"]),
+ "is_starred": user_starred_repo(user, repo),
+ "is_owner": user_owns_repo(user, repo),
+ "can_maintain": user_can_maintain_repo(user, repo),
+ "has_fork": bool(user and user_has_fork_for_target(user["id"], fork_target_id)),
+ "repo_active_tab": active_tab,
+ "show_ref_picker": show_ref_picker,
+ "source_repo": source_repo,
+ "selected_ref": selected_ref,
+ "selected_ref_label": ref_option_label(selected_ref) if selected_ref else "",
+ "ref_options": repo_ref_options(path) if show_ref_picker else [],
+ "selected_ref_value": ref_option_value(
+ selected_ref.get("type", REF_TYPE_TIP),
+ selected_ref.get("name", ""),
+ )
+ if selected_ref
+ else "",
+ }
+
+
+def repo_active_tab(repo):
+ base_path = f"/{repo['owner_username']}/{repo['name']}"
+ current_path = request.path.rstrip("/")
+ if current_path == base_path:
+ return "overview"
+ for tab, suffix in (
+ ("source", "/src"),
+ ("commits", "/commits"),
+ ("tags", "/tags"),
+ ("branches", "/branches"),
+ ("issues", "/issues"),
+ ("pulls", "/pulls"),
+ ("settings", "/settings"),
+ ):
+ if current_path == base_path + suffix or current_path.startswith(base_path + suffix + "/"):
+ return tab
+ return ""
+
+
+def quote_path(path):
+ return quote(path, safe="/")
+
+
+def clone_url(owner_username, repo_name):
+ scheme = request.get_header("X-Forwarded-Proto") or request.urlparts.scheme
+ host = request.get_header("Host")
+ if host and host.startswith("0.0.0.0"):
+ host = "127.0.0.1" + host[len("0.0.0.0") :]
+ return f"{scheme}://{host}/git/{owner_username}/{repo_name}"
+
+
+def parse_basic_auth(rate_limit_kind=None, clear_on_success=True):
+ header = request.get_header("Authorization", "")
+ if not header.lower().startswith("basic "):
+ return None, None
+ token = header.split(" ", 1)[1].strip()
+ try:
+ decoded = base64.b64decode(token).decode("utf-8")
+ username, password = decoded.split(":", 1)
+ except (ValueError, UnicodeDecodeError, base64.binascii.Error):
+ if rate_limit_kind:
+ if rate_limit_blocked(rate_limit_kind, ""):
+ return None, "rate_limited"
+ record_auth_failure(rate_limit_kind, "")
+ return None, "invalid"
+ username = username.strip().lower()
+ if rate_limit_kind and rate_limit_blocked(rate_limit_kind, username):
+ return None, "rate_limited"
+ user = get_user_by_username(username)
+ if user and verify_password(password, user["password_hash"]):
+ if rate_limit_kind and clear_on_success:
+ clear_auth_failures(rate_limit_kind, username)
+ return user, None
+ if rate_limit_kind:
+ record_auth_failure(rate_limit_kind, username)
+ return None, "invalid"
+
+
+def git_service_from_request():
+ service = request.query.get("service")
+ if service:
+ return service
+ path_info = request.environ.get("PATH_INFO", "")
+ parts = path_info.strip("/").split("/")
+ if len(parts) >= 4 and parts[0] == "git":
+ return parts[3]
+ return ""
+
+
+def is_git_write_request():
+ return git_service_from_request() == "git-receive-pack"
+
+
+def basic_auth_challenge(message="Authentication required."):
+ return HTTPResponse(
+ message + "\n",
+ status=401,
+ headers={"WWW-Authenticate": 'Basic realm="GitMan"'},
+ content_type="text/plain; charset=utf-8",
+ )
+
+
+def git_http_backend_executable():
+ found = shutil.which("git-http-backend")
+ if found:
+ return found
+ completed = run_git(["--exec-path"], check=False)
+ if completed.returncode == 0:
+ candidate = Path(completed.stdout.strip()) / "git-http-backend"
+ if candidate.exists():
+ return str(candidate)
+ raise GitCommandError("git-http-backend executable was not found.")
+
+
+def git_http_backend_response(repo, auth_user):
+ mount = f"/git/{repo['owner_username']}/{repo['name']}"
+ original_path = request.environ.get("PATH_INFO", request.path)
+ rest = original_path[len(mount) :] if original_path.startswith(mount) else ""
+ body = request.body.read() if request.method == "POST" else b""
+
+ env = git_env()
+ env.update(
+ {
+ "GIT_PROJECT_ROOT": str(REPO_ROOT),
+ "GIT_HTTP_EXPORT_ALL": "1",
+ "PATH_INFO": f"/{repo['owner_username']}/{repo['name']}{rest}",
+ "REQUEST_METHOD": request.method,
+ "QUERY_STRING": request.query_string or "",
+ "CONTENT_TYPE": request.environ.get("CONTENT_TYPE", ""),
+ "CONTENT_LENGTH": str(len(body)),
+ "REMOTE_ADDR": request.environ.get("REMOTE_ADDR", ""),
+ }
+ )
+ if auth_user:
+ env["REMOTE_USER"] = auth_user["username"]
+
+ completed = subprocess.run(
+ [git_http_backend_executable()],
+ input=body,
+ capture_output=True,
+ timeout=60,
+ env=env,
+ )
+ if MAX_GIT_RESPONSE_BYTES and len(completed.stdout) > MAX_GIT_RESPONSE_BYTES:
+ return HTTPResponse(
+ "Git response too large.\n",
+ status=413,
+ content_type="text/plain; charset=utf-8",
+ )
+ if completed.returncode != 0 and not completed.stdout:
+ message = completed.stderr.decode("utf-8", "replace").strip() or "Git HTTP backend failed."
+ raise GitCommandError(message, completed.returncode)
+
+ raw = completed.stdout
+ if b"\r\n\r\n" in raw:
+ header_bytes, response_body = raw.split(b"\r\n\r\n", 1)
+ header_lines = header_bytes.decode("latin-1").split("\r\n")
+ elif b"\n\n" in raw:
+ header_bytes, response_body = raw.split(b"\n\n", 1)
+ header_lines = header_bytes.decode("latin-1").split("\n")
+ else:
+ header_lines = []
+ response_body = raw
+
+ status_code = 200
+ headers = {}
+ for line in header_lines:
+ if not line or ":" not in line:
+ continue
+ key, value = line.split(":", 1)
+ key = key.strip()
+ value = value.strip()
+ if key.lower() == "status":
+ try:
+ status_code = int(value.split(" ", 1)[0])
+ except ValueError:
+ status_code = 200
+ else:
+ headers[key] = value
+
+ return HTTPResponse(body=response_body, status=status_code, headers=headers)
+
+
[email protected]("/static/<filename:path>")
+def static_assets(filename):
+ return static_file(filename, root=str(BASE_DIR / "static"))
+
+
[email protected]("/favicon.ico")
+def favicon():
+ return HTTPResponse(status=204)
+
+
[email protected]("/")
+def index():
+ return render("index.tpl", actions=list_recent_actions(50))
+
+
[email protected]("/signup", method=["GET", "POST"])
+def signup():
+ if request.method == "GET":
+ return render("signup.tpl", next_url=safe_next_url(request.query.get("next")))
+
+ username_raw = request.forms.get("username", "")
+ password = request.forms.get("password", "")
+ next_url = safe_next_url(request.forms.get("next"))
+ signup_identifier = username_raw.strip().lower()
+ if rate_limit_blocked("signup", signup_identifier):
+ return too_many_requests_response()
+ try:
+ username = normalize_slug(username_raw, "Username")
+ if len(password) < 8:
+ raise ValueError("Password must be at least 8 characters.")
+ with db_connect() as conn:
+ conn.execute(
+ "INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
+ (username, hash_password(password), utcnow()),
+ )
+ user = get_user_by_username(username)
+ login_user(user)
+ clear_auth_failures("signup", username)
+ redirect(next_url)
+ except (sqlite3.IntegrityError, ValueError) as exc:
+ record_auth_failure("signup", signup_identifier)
+ message = "Username already exists." if isinstance(exc, sqlite3.IntegrityError) else str(exc)
+ return render("signup.tpl", error=message, username=username_raw, next_url=next_url)
+
+
[email protected]("/login", method=["GET", "POST"])
+def login():
+ if request.method == "GET":
+ return render("login.tpl", next_url=safe_next_url(request.query.get("next")))
+
+ username = request.forms.get("username", "").strip().lower()
+ password = request.forms.get("password", "")
+ next_url = safe_next_url(request.forms.get("next"))
+ if rate_limit_blocked("login", username):
+ return too_many_requests_response()
+ user = get_user_by_username(username)
+ if not user or not verify_password(password, user["password_hash"]):
+ record_auth_failure("login", username)
+ return render("login.tpl", error="Invalid username or password.", username=username, next_url=next_url)
+ clear_auth_failures("login", username)
+ login_user(user)
+ redirect(next_url)
+
+
[email protected]("/logout")
+def logout():
+ logout_user()
+ redirect("/")
+
+
[email protected]("/settings/profile", method=["GET", "POST"])
+def edit_profile():
+ user = require_login()
+ if request.method == "GET":
+ return render("edit_profile.tpl", profile=user)
+
+ values = profile_form_values(request.forms, user)
+ try:
+ values["website"] = normalize_website(values["website"])
+ with db_connect() as conn:
+ conn.execute(
+ """
+ UPDATE users
+ SET display_name = ?, bio = ?, website = ?
+ WHERE id = ?
+ """,
+ (values["display_name"], values["bio"], values["website"], user["id"]),
+ )
+ refreshed = get_user_by_id(user["id"])
+ request.environ["gitman.user"] = refreshed
+ return render("edit_profile.tpl", profile=refreshed, notice="Profile updated.")
+ except ValueError as exc:
+ return render("edit_profile.tpl", profile=values, error=str(exc))
+
+
[email protected]("/new", method=["GET", "POST"])
+def new_repo():
+ user = require_login()
+ if request.method == "GET":
+ return render("new_repo.tpl")
+
+ repo_name_raw = request.forms.get("name", "")
+ description = request.forms.get("description", "").strip()[:500]
+ try:
+ repo_name = normalize_slug(repo_name_raw, "Repository name")
+ create_repository(user, repo_name, description)
+ redirect(f"/{user['username']}/{repo_name}")
+ except (ValueError, GitCommandError) as exc:
+ return render("new_repo.tpl", error=str(exc), name=repo_name_raw, description=description)
+
+
[email protected]("/<username>")
+def user_profile(username):
+ profile_user = get_user_by_username(username.lower())
+ if not profile_user:
+ abort(404, "User not found.")
+ user = current_user()
+ active_tab = request.query.get("tab", "owned")
+ if active_tab not in {"owned", "stars"}:
+ active_tab = "owned"
+ owned_repos = list_owned_repos(profile_user["id"])
+ starred_repos = list_starred_repos(profile_user["id"])
+ return render(
+ "profile.tpl",
+ profile_user=profile_user,
+ owned_repos=owned_repos,
+ starred_repos=starred_repos,
+ repos=starred_repos if active_tab == "stars" else owned_repos,
+ active_tab=active_tab,
+ is_self=bool(user and user["id"] == profile_user["id"]),
+ )
+
+
[email protected]("/<owner>/<repo_name>")
+def repo_overview(owner, repo_name):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ selected_ref = selected_repo_ref(path)
+ revision = ref_revision(selected_ref)
+ files = git_files(path, revision)
+ readme_name, readme, readme_truncated = readme_preview_for_repo(path, files, revision=revision)
+ readme_is_markdown = is_markdown_file(readme_name)
+ context = repo_page_context(repo, path, selected_ref=selected_ref)
+ return render(
+ "repo.tpl",
+ repo=repo,
+ clone_url=clone_url(owner, repo_name),
+ readme_name=readme_name,
+ readme=readme,
+ readme_html=render_markdown(readme) if readme is not None and readme_is_markdown else None,
+ readme_is_markdown=readme_is_markdown,
+ readme_truncated=readme_truncated,
+ **context,
+ )
+
+
[email protected]("/<owner>/<repo_name>/star")
+def repo_star(owner, repo_name):
+ user = require_login()
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ action = request.forms.get("action", "star")
+ if action == "unstar":
+ unstar_repo(user, repo)
+ else:
+ star_repo(user, repo)
+ redirect(f"/{owner}/{repo_name}")
+
+
[email protected]("/<owner>/<repo_name>/fork", method=["GET", "POST"])
+def fork_repo(owner, repo_name):
+ user = require_login()
+ source_repo = get_repo(owner, repo_name)
+ if not source_repo:
+ abort(404, "Repository not found.")
+ default_description = source_repo["description"]
+ if request.method == "GET":
+ return render(
+ "fork_repo.tpl",
+ source_repo=source_repo,
+ name=source_repo["name"],
+ description=default_description,
+ )
+
+ repo_name_raw = request.forms.get("name", "")
+ description = request.forms.get("description", default_description).strip()[:500]
+ try:
+ fork_name = normalize_slug(repo_name_raw, "Repository name")
+ fork_repository(user, source_repo, fork_name, description)
+ redirect(f"/{user['username']}/{fork_name}")
+ except (ValueError, GitCommandError) as exc:
+ return render(
+ "fork_repo.tpl",
+ source_repo=source_repo,
+ name=repo_name_raw,
+ description=description,
+ error=str(exc),
+ )
+
+
[email protected]("/<owner>/<repo_name>/settings", method=["GET", "POST"])
+def repo_settings(owner, repo_name):
+ user = require_login()
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ if not user_owns_repo(user, repo):
+ abort(403, "Only the owner can update repository settings.")
+
+ path = repo_path(owner, repo_name)
+ if request.method == "GET":
+ return render(
+ "repo_settings.tpl",
+ repo=repo,
+ contributors=list_repo_contributors(repo["id"]),
+ contributor_username="",
+ **repo_page_context(repo, path),
+ )
+
+ action = request.forms.get("action", "save")
+ if action == "delete":
+ confirmation = request.forms.get("confirm_name", "").strip()
+ if confirmation != repo["name"]:
+ return render(
+ "repo_settings.tpl",
+ repo=repo,
+ contributors=list_repo_contributors(repo["id"]),
+ contributor_username="",
+ error=f'Type "{repo["name"]}" to confirm deletion.',
+ **repo_page_context(repo, path),
+ )
+ delete_repository(repo, path)
+ redirect(f"/{owner}")
+
+ if action == "add_contributor":
+ contributor_username = request.forms.get("username", "").strip()
+ try:
+ add_repo_contributor(repo, user, contributor_username)
+ redirect(f"/{owner}/{repo_name}/settings")
+ except ValueError as exc:
+ return render(
+ "repo_settings.tpl",
+ repo=repo,
+ contributors=list_repo_contributors(repo["id"]),
+ contributor_username=contributor_username,
+ error=str(exc),
+ **repo_page_context(repo, path),
+ )
+
+ if action == "remove_contributor":
+ try:
+ contributor_user_id = int(request.forms.get("user_id", ""))
+ except ValueError:
+ abort(400, "Invalid contributor.")
+ remove_repo_contributor(repo, contributor_user_id)
+ redirect(f"/{owner}/{repo_name}/settings")
+
+ description = request.forms.get("description", "").strip()[:500]
+ with db_connect() as conn:
+ conn.execute(
+ "UPDATE repositories SET description = ?, updated_at = ? WHERE id = ?",
+ (description, utcnow(), repo["id"]),
+ )
+ updated_repo = get_repo(owner, repo_name)
+ sync_repo_git_config(updated_repo)
+ return render(
+ "repo_settings.tpl",
+ repo=updated_repo,
+ contributors=list_repo_contributors(updated_repo["id"]),
+ contributor_username="",
+ notice="Repository settings updated.",
+ **repo_page_context(updated_repo, path),
+ )
+
+
[email protected]("/<owner>/<repo_name>/src")
[email protected]("/<owner>/<repo_name>/src/<file_path:path>")
+def repo_source(owner, repo_name, file_path=""):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ file_path = clean_repo_path(file_path)
+ path = repo_path(owner, repo_name)
+ selected_ref = selected_repo_ref(path)
+ revision = ref_revision(selected_ref)
+ files = git_files(path, revision)
+
+ if file_path in files:
+ content = read_file_bytes(path, file_path, revision=revision)
+ is_binary = b"\0" in content[:4096]
+ preview_content, preview_truncated = truncate_bytes_for_render(content)
+ return render(
+ "file.tpl",
+ repo=repo,
+ file_path=file_path,
+ content=preview_content.decode("utf-8", "replace") if not is_binary else "",
+ is_binary=is_binary,
+ language_class=highlight_language_class(file_path),
+ size=len(content),
+ preview_truncated=preview_truncated,
+ quote_path=quote_path,
+ **repo_page_context(repo, path, selected_ref=selected_ref),
+ )
+
+ if file_path and not any(item.startswith(file_path + "/") for item in files):
+ abort(404, "Path not found.")
+
+ return render(
+ "source.tpl",
+ repo=repo,
+ current_path=file_path,
+ entries=build_tree(files, file_path),
+ quote_path=quote_path,
+ **repo_page_context(repo, path, selected_ref=selected_ref),
+ )
+
+
[email protected]("/<owner>/<repo_name>/raw/<file_path:path>")
+def repo_raw(owner, repo_name, file_path):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ file_path = clean_repo_path(file_path)
+ path = repo_path(owner, repo_name)
+ selected_ref = selected_repo_ref(path)
+ revision = ref_revision(selected_ref)
+ files = git_files(path, revision)
+ if file_path not in files:
+ abort(404, "File not found.")
+ content = read_file_bytes(path, file_path, revision=revision)
+ content_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream"
+ return HTTPResponse(content, content_type=content_type)
+
+
[email protected]("/<owner>/<repo_name>/archive/<node>.zip")
+def repo_archive(owner, repo_name, node):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ node = validate_revision_id(node, allow_null=False)
+ path = repo_path(owner, repo_name)
+ completed = run_git(["archive", "--format=zip", node], cwd=path, check=False, text=False)
+ if completed.returncode != 0:
+ abort(404, "Archive not found.")
+ return HTTPResponse(
+ completed.stdout,
+ content_type="application/zip",
+ headers={"Content-Disposition": f'attachment; filename="{repo_name}-{node[:12]}.zip"'},
+ )
+
+
[email protected]("/<owner>/<repo_name>/commits")
+def repo_commits(owner, repo_name):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ selected_ref = selected_repo_ref(path)
+ revision = ref_revision(selected_ref)
+ return render(
+ "commits.tpl",
+ repo=repo,
+ commits=commit_log(path, revision=revision),
+ **repo_page_context(repo, path, selected_ref=selected_ref),
+ )
+
+
[email protected]("/<owner>/<repo_name>/tags")
+def repo_tags(owner, repo_name):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ return render(
+ "tags.tpl",
+ repo=repo,
+ tags=list_repo_tags(path),
+ clone_url=clone_url(owner, repo_name),
+ **repo_page_context(repo, path),
+ )
+
+
[email protected]("/<owner>/<repo_name>/branches")
+def repo_branches(owner, repo_name):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ return render(
+ "branches.tpl",
+ repo=repo,
+ branches=list_repo_branches(path),
+ clone_url=clone_url(owner, repo_name),
+ **repo_page_context(repo, path),
+ )
+
+
[email protected]("/<owner>/<repo_name>/commits/<node>", method=["GET", "POST"])
+def repo_commit(owner, repo_name, node):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ commit = commit_detail(path, node)
+ if request.method == "POST":
+ user = require_login()
+ action = request.forms.get("action")
+ if action == "comment":
+ body = request.forms.get("body", "").strip()
+ if not body:
+ return render_commit_detail(
+ repo,
+ path,
+ commit,
+ error="Comment body is required.",
+ comment_value=body,
+ )
+ now = utcnow()
+ with db_connect() as conn:
+ conn.execute(
+ """
+ INSERT INTO commit_comments (repo_id, commit_node, author_id, body, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?)
+ """,
+ (repo["id"], commit["node"], user["id"], body[:5000], now, now),
+ )
+ redirect(f"/{owner}/{repo_name}/commits/{commit['node']}")
+ return render_commit_detail(repo, path, commit)
+
+
[email protected]("/<owner>/<repo_name>/pulls")
+def repo_pull_requests(owner, repo_name):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ status = request.query.get("status", "open")
+ counts = pull_request_counts(repo["id"])
+ return render(
+ "pull_requests.tpl",
+ repo=repo,
+ pull_requests=list_pull_requests(repo["id"], status),
+ status=status if status in {"open", "closed", "merged", "all"} else "open",
+ counts=counts,
+ **repo_page_context(repo, path),
+ )
+
+
[email protected]("/<owner>/<repo_name>/pulls/new", method=["GET", "POST"])
+def new_pull_request(owner, repo_name):
+ user = require_login()
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ forks = list_user_forks_for_target(user["id"], repo["id"])
+ source_options = source_repo_ref_options(repo, include_tip=False)
+ for fork in forks:
+ source_options.extend(source_repo_ref_options(fork))
+ target_options = target_repo_ref_options(path)
+ target_option_values = {option["value"] for option in target_options}
+ selected_source_ref = request.forms.get("source_ref") if request.method == "POST" else request.query.get("source_ref")
+ selected_target_ref = request.forms.get("target_ref") if request.method == "POST" else request.query.get("target_ref")
+ if not selected_source_ref and source_options:
+ selected_source_ref = source_options[0]["value"]
+ if not selected_target_ref and target_options:
+ default_target = default_code_ref(path)
+ selected_target_ref = ref_option_value(default_target["type"], default_target.get("name", ""))
+ if selected_target_ref not in target_option_values:
+ selected_target_ref = target_options[0]["value"]
+ if selected_target_ref and selected_target_ref not in target_option_values and target_options:
+ selected_target_ref = target_options[0]["value"]
+ title_value = request.forms.get("title", "") if request.method == "POST" else ""
+ body_value = request.forms.get("body", "") if request.method == "POST" else ""
+
+ if request.method == "POST":
+ try:
+ source_repo_id, source_ref_type, source_ref_name = parse_source_ref_option_value(selected_source_ref)
+ target_ref_type, target_ref_name = parse_ref_option_value(
+ selected_target_ref,
+ allowed_types=TARGET_PULL_REQUEST_REF_TYPES,
+ )
+ except ValueError as exc:
+ return render(
+ "new_pull_request.tpl",
+ repo=repo,
+ forks=forks,
+ source_options=source_options,
+ target_options=target_options,
+ selected_source_ref=selected_source_ref,
+ selected_target_ref=selected_target_ref,
+ title_value=title_value,
+ body_value=body_value,
+ error=str(exc),
+ **repo_page_context(repo, path),
+ )
+ source_repo = get_repo_by_id(source_repo_id) if source_repo_id else None
+ title = title_value.strip()
+ body = body_value.strip()
+ if not title:
+ return render(
+ "new_pull_request.tpl",
+ repo=repo,
+ forks=forks,
+ source_options=source_options,
+ target_options=target_options,
+ selected_source_ref=selected_source_ref,
+ selected_target_ref=selected_target_ref,
+ title_value=title_value,
+ body_value=body_value,
+ error="Pull request title is required.",
+ **repo_page_context(repo, path),
+ )
+ try:
+ number = create_pull_request(
+ repo,
+ source_repo,
+ user,
+ title,
+ body,
+ source_ref_type,
+ source_ref_name,
+ target_ref_type,
+ target_ref_name,
+ )
+ redirect(f"/{owner}/{repo_name}/pulls/{number}")
+ except (ValueError, GitCommandError) as exc:
+ return render(
+ "new_pull_request.tpl",
+ repo=repo,
+ forks=forks,
+ source_options=source_options,
+ target_options=target_options,
+ selected_source_ref=selected_source_ref,
+ selected_target_ref=selected_target_ref,
+ title_value=title_value,
+ body_value=body_value,
+ error=str(exc),
+ **repo_page_context(repo, path),
+ )
+
+ return render(
+ "new_pull_request.tpl",
+ repo=repo,
+ forks=forks,
+ source_options=source_options,
+ target_options=target_options,
+ selected_source_ref=selected_source_ref,
+ selected_target_ref=selected_target_ref,
+ title_value=title_value,
+ body_value=body_value,
+ **repo_page_context(repo, path),
+ )
+
+
[email protected]("/<owner>/<repo_name>/pulls/<number:int>", method=["GET", "POST"])
+def pull_request_detail(owner, repo_name, number):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ pr = get_pull_request(repo["id"], number)
+ if not pr:
+ abort(404, "Pull request not found.")
+
+ if request.method == "POST":
+ user = require_login()
+ action = request.forms.get("action")
+ now = utcnow()
+ if action == "comment":
+ body = request.forms.get("body", "").strip()
+ if not body:
+ return render_pull_request_detail(
+ repo,
+ path,
+ pr,
+ error="Comment body is required.",
+ comment_value=body,
+ )
+ with db_connect() as conn:
+ conn.execute(
+ """
+ INSERT INTO pull_request_comments (pull_request_id, author_id, body, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?)
+ """,
+ (pr["id"], user["id"], body[:5000], now, now),
+ )
+ conn.execute(
+ "UPDATE pull_requests SET updated_at = ? WHERE id = ?",
+ (now, pr["id"]),
+ )
+ redirect(f"/{owner}/{repo_name}/pulls/{number}")
+
+ if not user_can_maintain_repo(user, repo):
+ abort(403, "Only maintainers can update pull requests.")
+ if action == "merge":
+ try:
+ merge_pull_request(pr, user)
+ redirect(f"/{owner}/{repo_name}/pulls/{number}")
+ except (ValueError, GitCommandError) as exc:
+ return render_pull_request_detail(repo, path, pr, error=str(exc))
+ if action == "close" and pr["status"] == "open":
+ now = utcnow()
+ with db_connect() as conn:
+ conn.execute(
+ """
+ UPDATE pull_requests
+ SET status = 'closed', updated_at = ?, closed_at = ?
+ WHERE id = ?
+ """,
+ (now, now, pr["id"]),
+ )
+ redirect(f"/{owner}/{repo_name}/pulls/{number}")
+
+ return render_pull_request_detail(repo, path, pr)
+
+
[email protected]("/<owner>/<repo_name>/issues")
+def repo_issues(owner, repo_name):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ status = request.query.get("status", "open")
+ counts = issue_counts(repo["id"])
+ context = repo_page_context(repo, path)
+ return render(
+ "issues.tpl",
+ repo=repo,
+ issues=list_issues(repo["id"], status),
+ status=status if status in {"open", "closed", "all"} else "open",
+ counts=counts,
+ **context,
+ )
+
+
[email protected]("/<owner>/<repo_name>/issues/new", method=["GET", "POST"])
+def new_issue(owner, repo_name):
+ user = require_login()
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+
+ if request.method == "GET":
+ return render("new_issue.tpl", repo=repo, title_value="", body_value="", **repo_page_context(repo, path))
+
+ title = request.forms.get("title", "").strip()
+ body = request.forms.get("body", "").strip()
+ if not title:
+ return render(
+ "new_issue.tpl",
+ repo=repo,
+ title_value=title,
+ body_value=body,
+ error="Issue title is required.",
+ **repo_page_context(repo, path),
+ )
+
+ now = utcnow()
+ with db_connect() as conn:
+ number = conn.execute(
+ "SELECT COALESCE(MAX(number), 0) + 1 FROM issues WHERE repo_id = ?",
+ (repo["id"],),
+ ).fetchone()[0]
+ conn.execute(
+ """
+ INSERT INTO issues (repo_id, author_id, number, title, body, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """,
+ (repo["id"], user["id"], number, title[:200], body[:5000], now, now),
+ )
+ redirect(f"/{owner}/{repo_name}/issues/{number}")
+
+
[email protected]("/<owner>/<repo_name>/issues/<number:int>", method=["GET", "POST"])
+def issue_detail(owner, repo_name, number):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+ path = repo_path(owner, repo_name)
+ issue = get_issue(repo["id"], number)
+ if not issue:
+ abort(404, "Issue not found.")
+
+ if request.method == "POST":
+ user = require_login()
+ action = request.forms.get("action")
+ now = utcnow()
+ if action == "comment":
+ body = request.forms.get("body", "").strip()
+ if not body:
+ return render(
+ "issue_detail.tpl",
+ repo=repo,
+ issue=issue,
+ comments=list_issue_comments(issue["id"]),
+ comment_value=body,
+ error="Comment body is required.",
+ **repo_page_context(repo, path),
+ )
+ with db_connect() as conn:
+ conn.execute(
+ """
+ INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?)
+ """,
+ (issue["id"], user["id"], body[:5000], now, now),
+ )
+ conn.execute(
+ "UPDATE issues SET updated_at = ? WHERE id = ?",
+ (now, issue["id"]),
+ )
+ else:
+ if not user_can_maintain_repo(user, repo):
+ abort(403, "Only maintainers can update issues.")
+ if action == "close" and issue["status"] != "closed":
+ with db_connect() as conn:
+ conn.execute(
+ "UPDATE issues SET status = 'closed', updated_at = ?, closed_at = ? WHERE id = ?",
+ (now, now, issue["id"]),
+ )
+ elif action == "reopen" and issue["status"] != "open":
+ with db_connect() as conn:
+ conn.execute(
+ "UPDATE issues SET status = 'open', updated_at = ?, closed_at = NULL WHERE id = ?",
+ (now, issue["id"]),
+ )
+ redirect(f"/{owner}/{repo_name}/issues/{number}")
+
+ return render(
+ "issue_detail.tpl",
+ repo=repo,
+ issue=issue,
+ comments=list_issue_comments(issue["id"]),
+ comment_value="",
+ **repo_page_context(repo, path),
+ )
+
+
[email protected]("/git/<owner>/<repo_name>", method=["GET", "POST"])
[email protected]("/git/<owner>/<repo_name>/<git_path:path>", method=["GET", "POST"])
+def git_http(owner, repo_name, git_path=""):
+ repo = get_repo(owner, repo_name)
+ if not repo:
+ abort(404, "Repository not found.")
+
+ is_write = is_git_write_request()
+ auth_user, auth_error = parse_basic_auth("git" if is_write else None, clear_on_success=not is_write)
+ if is_write:
+ if auth_error == "rate_limited":
+ return too_many_requests_response()
+ if auth_error:
+ return basic_auth_challenge("Invalid Git credentials.")
+ if not auth_user:
+ return basic_auth_challenge()
+ if not user_can_maintain_repo(auth_user, repo):
+ record_auth_failure("git", auth_user["username"])
+ return HTTPResponse(
+ "Push not authorized for this repository.\n",
+ status=403,
+ content_type="text/plain; charset=utf-8",
+ )
+ clear_auth_failures("git", auth_user["username"])
+ prepare_repo_for_receive(repo_path(owner, repo_name))
+
+ return git_http_backend_response(repo, auth_user)
+
+
[email protected](404)
+def not_found(error):
+ return render("error.tpl", title="Not found", message=getattr(error, "body", "Not found."))
+
+
[email protected](403)
+def forbidden(error):
+ return render("error.tpl", title="Forbidden", message=getattr(error, "body", "Forbidden."))
+
+
[email protected](413)
+def request_too_large(error):
+ return render("error.tpl", title="Request too large", message=getattr(error, "body", "Request body too large."))
+
+
[email protected](500)
+def server_error(error):
+ return render("error.tpl", title="Server error", message="Something went wrong.")
+
+
+class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
+ daemon_threads = True
+
+
+validate_startup_config()
+init_db()
+
+
+if __name__ == "__main__":
+ port = int(os.environ.get("PORT", "8080"))
+ run(
+ app,
+ host="0.0.0.0",
+ port=port,
+ debug=DEBUG,
+ reloader=DEBUG,
+ server="wsgiref",
+ server_class=ThreadingWSGIServer,
+ )
diff --git a/gitignore b/gitignore
new file mode 100644
index 0000000..632cf46
--- /dev/null
+++ b/gitignore
@@ -0,0 +1,4 @@
+__pycache__/
+.env
+data/
+.pytest_cache/
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..b522736
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,4 @@
+bottle>=0.13,<0.14
+bleach>=6.1,<7
+Markdown>=3.8,<4
+gunicorn
diff --git a/static/icon.png b/static/icon.png
new file mode 100644
index 0000000..b1fbd73
Binary files /dev/null and b/static/icon.png differ
diff --git a/static/logo.png b/static/logo.png
new file mode 100644
index 0000000..b1fbd73
Binary files /dev/null and b/static/logo.png differ
diff --git a/static/styles.css b/static/styles.css
new file mode 100644
index 0000000..3cd6fef
--- /dev/null
+++ b/static/styles.css
@@ -0,0 +1,171 @@
+* { box-sizing: border-box; }
+
+:root {
+ color-scheme: light;
+ --page-bg: #fff;
+ --text: #111;
+ --link: #0645ad;
+ --surface: #fff;
+ --soft-surface: #fafafa;
+ --button-surface: #eee;
+ --border: #ddd;
+ --soft-border: #eee;
+ --strong-border: #999;
+ --control-border: #aaa;
+ --menu-border: #bbb;
+ --muted: #666;
+ --detail: #333;
+ --danger: #900;
+ --notice: #060;
+ --ref-hover: #eef4fb;
+ --shadow: rgba(0, 0, 0, .12);
+}
+
+@media (prefers-color-scheme: dark) {
+ :root {
+ color-scheme: dark;
+ --page-bg: #101214;
+ --text: #e8e8e3;
+ --link: #7fb4ff;
+ --surface: #15191d;
+ --soft-surface: #1b2025;
+ --button-surface: #242a30;
+ --border: #343a40;
+ --soft-border: #2a3036;
+ --strong-border: #6b727a;
+ --control-border: #6b727a;
+ --menu-border: #464d55;
+ --muted: #a3a9b0;
+ --detail: #d6d8d5;
+ --danger: #ff8b8b;
+ --notice: #7fd18b;
+ --ref-hover: #233246;
+ --shadow: rgba(0, 0, 0, .45);
+ }
+}
+
+body {
+ max-width: 960px;
+ margin: 2rem auto;
+ padding: 0 1rem 3rem;
+ color: var(--text);
+ background: var(--page-bg);
+ font: 14px/1.55 ui-monospace, SFMono-Regular, Menlo, Consolas, "Liberation Mono", monospace;
+}
+
+a { color: var(--link); text-decoration: underline; }
+h1, h2, p, pre, ul, table, form { margin-top: 0; }
+pre, code, input, textarea, select, button { font: inherit; }
+pre, .clone-box code { overflow-x: auto; padding: .6rem; border: 1px solid var(--border); background: var(--soft-surface); }
+pre code.hljs { padding: 0; background: transparent; }
+input, textarea, select { width: 100%; padding: .4rem; color: var(--text); border: 1px solid var(--control-border); background: var(--surface); }
+table { width: 100%; border-collapse: collapse; }
+th, td { padding: .35rem .5rem; border: 1px solid var(--border); text-align: left; }
+
+button, .button {
+ display: inline-block;
+ padding: .3rem .55rem;
+ color: var(--text);
+ background: var(--button-surface);
+ border: 1px solid var(--strong-border);
+ text-decoration: none;
+ cursor: pointer;
+}
+
+.site-header, .repo-header, .profile-header, .panel, .auth-card {
+ margin-bottom: 1.5rem;
+ padding-bottom: 1rem;
+ border-bottom: 1px solid var(--border);
+}
+
+.site-header, .nav, .repo-tabs, .filters, .tabs, .panel-heading, .hero-actions, .breadcrumb, .repo-actions, .ref-actions {
+ display: flex;
+ gap: .75rem;
+ align-items: baseline;
+ flex-wrap: wrap;
+}
+
+.site-header, .panel-heading, .repo-header { justify-content: space-between; }
+.repo-title-row { display: flex; gap: .75rem; align-items: baseline; flex-wrap: wrap; }
+.repo-title-row h1 { margin-bottom: 0; }
+.brand { color: var(--text); font-weight: 700; text-decoration: none; }
+.nav form, .repo-tabs form, .inline-form { display: inline; }
+.link-button { padding: 0; color: var(--link); background: none; border: 0; text-decoration: underline; }
+.repo-tabs .repo-tab { padding-bottom: .2rem; border-bottom: 2px solid transparent; }
+.repo-tabs .repo-tab.active { color: var(--text); font-weight: 700; text-decoration: none; }
+.ref-picker { position: relative; display: inline-block; }
+.ref-picker-menu {
+ position: absolute;
+ left: 0;
+ top: calc(100% + .4rem);
+ z-index: 20;
+ width: 20rem;
+ max-width: calc(100vw - 2rem);
+ padding: .5rem;
+ border: 1px solid var(--menu-border);
+ background: var(--surface);
+ box-shadow: 0 .4rem 1.2rem var(--shadow);
+}
+.ref-picker-search { margin-bottom: .4rem; }
+.ref-picker-options { display: grid; gap: .1rem; max-height: 18rem; overflow-y: auto; }
+.ref-picker-option {
+ display: flex;
+ gap: 1rem;
+ justify-content: space-between;
+ padding: .35rem .45rem;
+ color: var(--text);
+ text-decoration: none;
+}
+.ref-picker-option:hover, .ref-picker-option:focus, .ref-picker-option.active {
+ background: var(--ref-hover);
+ text-decoration: none;
+}
+.ref-picker-option.active { font-weight: 700; }
+.ref-picker-current { color: var(--muted); font-weight: 400; }
+.ref-picker-empty { padding: .35rem .45rem; color: var(--muted); }
+.ref-picker-footer {
+ display: flex;
+ gap: .75rem;
+ flex-wrap: wrap;
+ margin-top: .5rem;
+ padding-top: .45rem;
+ border-top: 1px solid var(--border);
+}
+.ref-picker-link.active { color: var(--text); font-weight: 700; text-decoration: none; }
+.filters .active, .tabs .active { color: var(--text); font-weight: 700; text-decoration: none; }
+.hero, .repo-tabs { margin-bottom: 1.5rem; }
+.eyebrow, .muted, .empty, .nav-user, .repo-card small, .issue-list span, .commit-list span, .file-list span, .clean-list span, .file-kind { color: var(--muted); }
+.repo-list, .stack, form, .clean-list, .commit-list, .file-list, .issue-list { display: grid; gap: .75rem; }
+.repo-card { display: block; padding: .75rem 0; border-bottom: 1px solid var(--soft-border); color: var(--text); }
+.repo-card strong, .repo-card span, .repo-card small, .clone-box code { display: block; }
+.markdown-body { overflow-x: auto; }
+.markdown-body img { max-width: 100%; }
+.grid.two { display: grid; grid-template-columns: 1fr 18rem; gap: 2rem; }
+.meta-list { display: grid; grid-template-columns: 8rem 1fr; gap: .35rem 1rem; }
+.meta-list dt { color: var(--muted); }
+.meta-list dd { margin: 0; overflow-x: auto; }
+.activity-feed { display: grid; gap: 1rem; padding-left: 0; list-style: none; }
+.activity-feed li { padding-bottom: 1rem; border-bottom: 1px solid var(--soft-border); }
+.activity-title, .activity-detail { margin-bottom: .25rem; }
+.activity-detail { color: var(--detail); }
+.comment-list { display: grid; gap: 1rem; }
+.comment { border-bottom: 1px solid var(--soft-border); }
+.diff { max-height: 70vh; }
+.file-table { width: 100%; border-collapse: collapse; }
+.file-table td { padding: .4rem 0; border-bottom: 1px solid var(--soft-border); }
+.file-kind { width: 4rem; }
+.commit-list, .file-list, .issue-list, .clean-list { padding-left: 0; list-style: none; }
+.commit-list li, .file-list li { display: grid; grid-template-columns: 8rem 1fr; gap: 1rem; }
+.tag-list li { display: grid; grid-template-columns: 1fr 18rem; gap: 1.5rem; padding-bottom: 1rem; border-bottom: 1px solid var(--soft-border); }
+.tag-actions { display: grid; gap: .75rem; align-content: start; }
+.alert { margin-bottom: 1rem; color: var(--danger); }
+.notice { margin-bottom: 1rem; color: var(--notice); }
+.danger-zone { color: var(--danger); }
+.button.danger { color: var(--danger); border-color: var(--danger); }
+.button-link { background: none; border: none; padding: 0; color: var(--link); text-decoration: underline; cursor: pointer; font: inherit; }
+.ref-picker-toggle { display: inline-flex; gap: .25rem; align-items: center; background: none; border: none; padding: 0; color: var(--muted); cursor: pointer; font: inherit; font-size: smaller; }
+
+@media (max-width: 760px) {
+ body { margin-top: 1rem; }
+ .grid.two, .commit-list li, .tag-list li { grid-template-columns: 1fr; }
+}
diff --git a/templates/base.tpl b/templates/base.tpl
new file mode 100644
index 0000000..db1de80
--- /dev/null
+++ b/templates/base.tpl
@@ -0,0 +1,134 @@
+<!doctype html>
+<html lang="en">
+<head>
+ <meta charset="utf-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <title>{{title or "GitMan"}}</title>
+ <link rel="icon" type="image/png" sizes="32x32" href="/static/icon.png">
+<link rel="icon" type="image/png" sizes="16x16" href="/static/icon.png">
+<link rel="apple-touch-icon" sizes="180x180" href="/static/icon.png">
+ <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/github.min.css">
+ <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/styles/github-dark.min.css" media="(prefers-color-scheme: dark)">
+ <link rel="stylesheet" href="/static/styles.css">
+</head>
+<body>
+ <header class="site-header">
+ <a class="brand" href="/"><img src="/static/logo.png" style="max-height:36px;"></a>
+ <nav class="nav">
+ % if user:
+ <a href="/{{user['username']}}">{{user['username']}}</a>
+ <a href="/new">New repo</a>
+ <form action="/logout" method="post">
+ {{!csrf_field()}}
+ <button class="link-button" type="submit">Log out</button>
+ </form>
+ % else:
+ <a href="/login">Log in</a>
+ <a class="button small" href="/signup">Sign up</a>
+ % end
+ </nav>
+ </header>
+
+ <main class="page">
+ % if error:
+ <div class="alert">{{error}}</div>
+ % end
+ % if notice:
+ <div class="notice">{{notice}}</div>
+ % end
+ {{!base}}
+ </main>
+ <script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.9.0/highlight.min.js"></script>
+ <script>
+ if (window.hljs) {
+ document.querySelectorAll("pre code").forEach((block) => hljs.highlightElement(block));
+ }
+ </script>
+ <script>
+ (() => {
+ const pickers = document.querySelectorAll("[data-ref-picker]");
+ if (!pickers.length) return;
+
+ const applyFilter = (picker) => {
+ const search = picker.querySelector("[data-ref-picker-search]");
+ const options = picker.querySelectorAll("[data-ref-picker-option]");
+ const empty = picker.querySelector("[data-ref-picker-empty]");
+ const query = search ? search.value.trim().toLowerCase() : "";
+ let visibleCount = 0;
+
+ options.forEach((option) => {
+ const label = option.dataset.refLabel || "";
+ const isVisible = query
+ ? label.includes(query)
+ : option.dataset.refInitial === "true";
+ option.hidden = !isVisible;
+ if (isVisible) visibleCount += 1;
+ });
+
+ if (empty) empty.hidden = visibleCount > 0;
+ };
+
+ const resetFilter = (picker) => {
+ const search = picker.querySelector("[data-ref-picker-search]");
+ if (search) search.value = "";
+ applyFilter(picker);
+ };
+
+ const closePicker = (picker) => {
+ const button = picker.querySelector(".ref-picker-toggle");
+ const menu = picker.querySelector("[data-ref-picker-menu]");
+ if (!menu || menu.hidden) return;
+ menu.hidden = true;
+ if (button) button.setAttribute("aria-expanded", "false");
+ };
+
+ const openPicker = (picker) => {
+ pickers.forEach((other) => {
+ if (other !== picker) closePicker(other);
+ });
+ const button = picker.querySelector(".ref-picker-toggle");
+ const menu = picker.querySelector("[data-ref-picker-menu]");
+ const search = picker.querySelector("[data-ref-picker-search]");
+ if (!menu) return;
+ resetFilter(picker);
+ menu.hidden = false;
+ if (button) button.setAttribute("aria-expanded", "true");
+ if (search) search.focus();
+ };
+
+ pickers.forEach((picker) => {
+ const button = picker.querySelector(".ref-picker-toggle");
+ const search = picker.querySelector("[data-ref-picker-search]");
+
+ if (button) {
+ button.addEventListener("click", () => {
+ const menu = picker.querySelector("[data-ref-picker-menu]");
+ if (menu && menu.hidden) {
+ openPicker(picker);
+ } else {
+ closePicker(picker);
+ }
+ });
+ }
+
+ if (search) {
+ search.addEventListener("input", () => {
+ applyFilter(picker);
+ });
+ }
+ });
+
+ document.addEventListener("click", (event) => {
+ pickers.forEach((picker) => {
+ if (!picker.contains(event.target)) closePicker(picker);
+ });
+ });
+
+ document.addEventListener("keydown", (event) => {
+ if (event.key !== "Escape") return;
+ pickers.forEach(closePicker);
+ });
+ })();
+ </script>
+</body>
+</html>
diff --git a/templates/branches.tpl b/templates/branches.tpl
new file mode 100644
index 0000000..0ff3853
--- /dev/null
+++ b/templates/branches.tpl
@@ -0,0 +1,36 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"] + " branches", user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ <p class="muted">Branches</p>
+
+ </div>
+</section>
+
+<section class="panel">
+ % if branches:
+ <ul class="commit-list">
+ % for branch in branches:
+ <li>
+ <code>{{branch["name"]}}</code>
+ <div style="margin-bottom:20px;">
+ <strong><a href="/{{repo['owner_username']}}/{{repo['name']}}/commits/{{branch['short_node']}}">{{branch["short_node"]}}</a></strong>
+ <small>commit {{branch["short_node"]}} · {{branch["date"]}}</small>
+ <p>{{branch["summary"]}}
+ <small><a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src', branch, True)}}">Browse code</a> ·
+ <a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/commits', branch, True)}}">Commits</a> ·
+ <a href="/{{repo['owner_username']}}/{{repo['name']}}/archive/{{branch['short_node']}}.zip">Archive</a></small>
+ </p>
+ </div>
+ </li>
+ % end
+ </ul>
+ % else:
+ <p class="empty">No branches yet.</p>
+ % end
+</section>
diff --git a/templates/commit_detail.tpl b/templates/commit_detail.tpl
new file mode 100644
index 0000000..7686f1c
--- /dev/null
+++ b/templates/commit_detail.tpl
@@ -0,0 +1,68 @@
+% rebase("base.tpl", title=commit["short_node"] + " at " + repo["owner_username"] + "/" + repo["name"], user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ <h2>{{commit["description"].splitlines()[0] if commit["description"] else commit["short_node"]}}</h2>
+ <p class="muted">Commit {{commit["short_node"]}} · {{commit["author"]}} · {{commit["date"]}}</p>
+ <dl class="meta-list">
+ <dt>Changeset</dt>
+ <dd>
+ <code>{{commit["node"]}}</code>
+ </dd>
+ % if commit["parents"]:
+ <dt>Parents</dt>
+ <dd><code>{{commit["parents"]}}</code></dd>
+ % end
+ </dl>
+ <p class="muted"><a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src', commit_source_ref, True)}}">View source at this commit</a></p>
+ % if "\n" in commit["description"]:
+ <pre class="readme">{{commit["description"]}}</pre>
+ % end
+</section>
+
+<section class="panel">
+ <h2>Comments</h2>
+ % if comments:
+ <div class="comment-list">
+ % for comment in comments:
+ <article class="comment">
+ <p><strong><a href="/{{comment["author_username"]}}">@{{comment["author_username"]}}</a>:</strong> {{!render_markdown_links(comment["body"])}} <small class="muted">{{comment["created_at"]}}</small></p>
+ </article>
+ % end
+ </div>
+ % else:
+ <p class="empty">No comments yet.</p>
+ % end
+
+ % if user:
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="comment">
+ <label>
+ Add a comment
+ <textarea name="body" rows="5">{{comment_value}}</textarea>
+ </label>
+ <button class="button" type="submit">Comment</button>
+ </form>
+ % else:
+ <p><a href="/login?next=/{{repo['owner_username']}}/{{repo['name']}}/commits/{{commit['node']}}">Log in to comment</a></p>
+ % end
+</section>
+
+<section class="panel">
+ <h2>Diff</h2>
+ % if diff:
+ <pre class="diff"><code class="language-diff">{{diff}}</code></pre>
+ % else:
+ <p class="empty">No diff for this commit.</p>
+ % end
+</section>
diff --git a/templates/commits.tpl b/templates/commits.tpl
new file mode 100644
index 0000000..b882b19
--- /dev/null
+++ b/templates/commits.tpl
@@ -0,0 +1,30 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"] + " commits", user=user, error=error, notice=notice)
+
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ % if commits:
+ <ul class="commit-list">
+ % for commit in commits:
+ <li>
+ <code><a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/commits/' + commit['node'], selected_ref)}}">{{commit["short_node"]}}</a></code>
+ <div>
+ <strong>{{commit["summary"]}}</strong>
+ <small>{{commit["author"]}} · {{commit["date"]}}</small>
+ </div>
+ </li>
+ % end
+ </ul>
+ % else:
+ <p class="empty">No commits yet.</p>
+ % end
+</section>
diff --git a/templates/edit_profile.tpl b/templates/edit_profile.tpl
new file mode 100644
index 0000000..94c34ac
--- /dev/null
+++ b/templates/edit_profile.tpl
@@ -0,0 +1,23 @@
+% rebase("base.tpl", title="Edit profile", user=user, error=error, notice=notice)
+
+<section class="auth-card wide">
+ <h1>Edit profile</h1>
+ <form method="post">
+ {{!csrf_field()}}
+ <label>
+ Display name
+ <input name="display_name" value="{{profile['display_name']}}" maxlength="80">
+ </label>
+ <label>
+ Bio
+ <textarea name="bio" rows="6" maxlength="1000">{{profile["bio"]}}</textarea>
+ </label>
+ <label>
+ Website
+ <input name="website" value="{{profile['website']}}" placeholder="https://example.com" maxlength="255">
+ </label>
+ <button class="button" type="submit">Save profile</button>
+ </form>
+ <br><br>
+ <p class="muted"><a href="/{{profile['username']}}"><- Back</a></p>
+</section>
diff --git a/templates/error.tpl b/templates/error.tpl
new file mode 100644
index 0000000..a81cb69
--- /dev/null
+++ b/templates/error.tpl
@@ -0,0 +1,7 @@
+% rebase("base.tpl", title=title, user=user, error=error, notice=notice)
+
+<section class="auth-card">
+ <h1>{{title}}</h1>
+ <p>{{message}}</p>
+ <a class="button secondary" href="/">Go home</a>
+</section>
diff --git a/templates/file.tpl b/templates/file.tpl
new file mode 100644
index 0000000..279c972
--- /dev/null
+++ b/templates/file.tpl
@@ -0,0 +1,37 @@
+% rebase("base.tpl", title=file_path + " at " + repo["owner_username"] + "/" + repo["name"], user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ <div class="breadcrumb">
+ <a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src', selected_ref)}}">root</a>
+ % parts = file_path.split("/")
+ % running = ""
+ % for index, part in enumerate(parts):
+ % running = part if not running else running + "/" + part
+ <span>/</span>
+ % if index + 1 == len(parts):
+ <span>{{part}}</span>
+ % else:
+ <a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src/' + quote_path(running), selected_ref)}}">{{part}}</a>
+ % end
+ % end
+ <a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/raw/' + quote_path(file_path), selected_ref)}}">[View Raw]</a>
+ </div>
+ </div>
+</section>
+
+<section class="panel">
+ % if is_binary:
+ <p class="empty">Binary file, {{size}} bytes. Use the raw view to download it.</p>
+ % else:
+ % if preview_truncated:
+ <p class="notice">File preview truncated. Use the raw view to download the full file.</p>
+ % end
+ <pre class="code"><code class="{{language_class}}">{{content}}</code></pre>
+ % end
+</section>
diff --git a/templates/fork_repo.tpl b/templates/fork_repo.tpl
new file mode 100644
index 0000000..74f8d1d
--- /dev/null
+++ b/templates/fork_repo.tpl
@@ -0,0 +1,17 @@
+% rebase("base.tpl", title="Fork " + source_repo["owner_username"] + "/" + source_repo["name"], user=user, error=error, notice=notice)
+
+<section class="auth-card wide">
+ <h1>Fork {{source_repo["owner_username"]}}/{{source_repo["name"]}}</h1>
+ <form method="post">
+ {{!csrf_field()}}
+ <label>
+ Repository name
+ <input name="name" value="{{name}}" required pattern="[a-z0-9][a-z0-9._-]{1,62}">
+ </label>
+ <label>
+ Description
+ <textarea name="description" rows="3">{{description}}</textarea>
+ </label>
+ <button class="button" type="submit">Fork repository</button>
+ </form>
+</section>
diff --git a/templates/index.tpl b/templates/index.tpl
new file mode 100644
index 0000000..0c5c2e0
--- /dev/null
+++ b/templates/index.tpl
@@ -0,0 +1,53 @@
+% rebase("base.tpl", title="GitMan activity", user=user, error=error, notice=notice)
+
+<section class="panel">
+ <div class="panel-heading">
+ <div>
+ % if user:
+ <p class="eyebrow">Recent activity</p>
+ <h1>Feed</h1>
+ % else:
+ <p class ="eyebrow">Git hosting</p>
+ <h1>Free Git repository hosting for open source software</h1>
+ <div class="hero-actions" style="margin-bottom:50px;">
+ <a class="button" href="/signup">Create an account</a>
+ <a class="button secondary" href="/login">Log in</a>
+ </div>
+ % end
+ </div>
+ </div>
+
+ % if actions:
+ % if not user:
+ <h1>Recent Activity Feed</h1>
+ % end
+ <ol class="activity-feed">
+ % for action in actions:
+ % repo_url = "/" + action["repo_owner_username"] + "/" + action["repo_name"] if action["repo_owner_username"] and action["repo_name"] else ""
+ % show_repo_context = repo_url and action["target_url"] != repo_url
+ <li>
+ <p class="activity-title">
+ % if action["actor_url"]:
+ <strong><a href="{{action['actor_url']}}">{{action["actor_label"]}}</a></strong>
+ % else:
+ <strong>{{action["actor_label"]}}</strong>
+ % end
+ {{action["summary"]}}
+ <a href="{{action['target_url']}}">{{action["target_label"]}}</a>
+ </p>
+ <p class="muted">
+ % if show_repo_context:
+ in <a href="{{repo_url}}">{{action["repo_owner_username"]}}/{{action["repo_name"]}}</a> ·
+ % end
+ {{action["occurred_at"]}}
+ </p>
+ % if action["detail"]:
+ <p class="activity-detail">{{!render_markdown_links(action["detail"])}}</p>
+ % end
+ </li>
+ % end
+ </ol>
+ % else:
+ <p class="empty">No activity yet.</p>
+ % end
+</section>
diff --git a/templates/issue_detail.tpl b/templates/issue_detail.tpl
new file mode 100644
index 0000000..3e452b6
--- /dev/null
+++ b/templates/issue_detail.tpl
@@ -0,0 +1,66 @@
+% rebase("base.tpl", title="#" + str(issue["number"]) + " " + issue["title"], user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ <div class="panel-heading">
+ <div>
+ <h2>(#{{issue["number"]}}) {{issue["title"]}}</h2>
+ <p class="muted"><strong>{{issue["status"]}}</strong> by {{issue["author_username"]}} on {{issue["created_at"]}}</p>
+ </div>
+ % if can_maintain:
+ <form method="post">
+ {{!csrf_field()}}
+ % if issue["status"] == "open":
+ <input type="hidden" name="action" value="close">
+ <button class="button secondary small" type="submit">Close issue</button>
+ % else:
+ <input type="hidden" name="action" value="reopen">
+ <button class="button secondary small" type="submit">Reopen issue</button>
+ % end
+ </form>
+ % end
+ </div>
+ % if issue["body"]:
+ <span>{{issue["body"]}}</span>
+ % else:
+ <p class="empty">No description.</p>
+ % end
+</section>
+
+<section class="panel">
+ <h2>Comments</h2>
+ % if comments:
+ <div class="comment-list">
+ % for comment in comments:
+ <article class="comment">
+ <p><strong><a href="/{{comment["author_username"]}}">@{{comment["author_username"]}}</a>:</strong> {{!render_markdown_links(comment["body"])}} <small class="muted">{{comment["created_at"]}}</small></p>
+ </article>
+ % end
+ </div>
+ % else:
+ <p class="empty">No comments yet.</p>
+ % end
+
+ % if user:
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="comment">
+ <label>
+ Add a comment
+ <textarea name="body" rows="5">{{comment_value}}</textarea>
+ </label>
+ <button class="button" type="submit">Comment</button>
+ </form>
+ % else:
+ <p><a href="/login?next=/{{repo['owner_username']}}/{{repo['name']}}/issues/{{issue['number']}}">Log in to comment</a></p>
+ % end
+</section>
diff --git a/templates/issues.tpl b/templates/issues.tpl
new file mode 100644
index 0000000..27ab2ab
--- /dev/null
+++ b/templates/issues.tpl
@@ -0,0 +1,38 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"] + " issues", user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ <div class="panel-heading">
+ <h2>Issues</h2>
+ <div class="filters">
+ <a class="{{'active' if status == 'open' else ''}}" href="?status=open">Open ({{counts["open"]}})</a>
+ <a class="{{'active' if status == 'closed' else ''}}" href="?status=closed">Closed ({{counts["closed"]}})</a>
+ <a class="{{'active' if status == 'all' else ''}}" href="?status=all">All</a>
+ % if user:
+ <a href="/{{repo['owner_username']}}/{{repo['name']}}/issues/new">New issue</a>
+ % else:
+ <a href="/login?next=/{{repo['owner_username']}}/{{repo['name']}}/issues/new">New issue</a>
+ % end
+ </div>
+ </div>
+ % if issues:
+ <ul class="issue-list">
+ % for issue in issues:
+ <li>
+ <a href="/{{repo['owner_username']}}/{{repo['name']}}/issues/{{issue['number']}}">#{{issue["number"]}} {{issue["title"]}}</a> <span>({{issue["status"]}})</span>
+ </li>
+ % end
+ </ul>
+ % else:
+ <p class="empty">No {{status}} issues.</p>
+ % end
+</section>
diff --git a/templates/login.tpl b/templates/login.tpl
new file mode 100644
index 0000000..cf477f6
--- /dev/null
+++ b/templates/login.tpl
@@ -0,0 +1,19 @@
+% rebase("base.tpl", title="Log in", user=user, error=error, notice=notice)
+
+<section class="auth-card">
+ <h1>Log in</h1>
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="next" value="{{next_url}}">
+ <label>
+ Username
+ <input name="username" value="{{username if defined('username') else ''}}" required autocomplete="username">
+ </label>
+ <label>
+ Password
+ <input name="password" type="password" required autocomplete="current-password">
+ </label>
+ <button class="button" type="submit">Log in</button>
+ </form>
+ <p class="muted">Need an account? <a href="/signup">Sign up</a>.</p>
+</section>
diff --git a/templates/new_issue.tpl b/templates/new_issue.tpl
new file mode 100644
index 0000000..9ea32ab
--- /dev/null
+++ b/templates/new_issue.tpl
@@ -0,0 +1,27 @@
+% rebase("base.tpl", title="New issue", user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ <h2>Open a new issue</h2>
+ <form method="post">
+ {{!csrf_field()}}
+ <label>
+ Title
+ <input name="title" value="{{title_value}}" required maxlength="200">
+ </label>
+ <label>
+ Body
+ <textarea name="body" rows="8">{{body_value}}</textarea>
+ </label>
+ <button class="button" type="submit">Open issue</button>
+ </form>
+</section>
diff --git a/templates/new_pull_request.tpl b/templates/new_pull_request.tpl
new file mode 100644
index 0000000..fd19666
--- /dev/null
+++ b/templates/new_pull_request.tpl
@@ -0,0 +1,55 @@
+% rebase("base.tpl", title="New pull request", user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ % if source_options and target_options:
+ <h2>Open pull request</h2>
+ <form method="post">
+ {{!csrf_field()}}
+ <label>
+ Source ref
+ <select name="source_ref">
+ % for option in source_options:
+ <option value="{{option['value']}}" {{"selected" if option["value"] == selected_source_ref else ""}}>{{option["label"]}}</option>
+ % end
+ </select>
+ </label>
+ <label>
+ Target ref
+ <select name="target_ref">
+ % for option in target_options:
+ <option value="{{option['value']}}" {{"selected" if option["value"] == selected_target_ref else ""}}>{{option["label"]}}</option>
+ % end
+ </select>
+ </label>
+ <label>
+ Title
+ <input name="title" value="{{title_value}}" required maxlength="200">
+ </label>
+ <label>
+ Body
+ <textarea name="body" rows="8">{{body_value}}</textarea>
+ </label>
+ <button class="button" type="submit">Open pull request</button>
+ </form>
+ % elif source_options:
+ <p class="empty">This repository has no target branches.</p>
+ % else:
+ <p class="empty">This repository has no source branches yet.</p>
+ <form class="inline-form" method="post" action="/{{repo['owner_username']}}/{{repo['name']}}/fork">
+ {{!csrf_field()}}
+ <input type="hidden" name="name" value="{{repo['name']}}">
+ <input type="hidden" name="description" value="{{repo['description']}}">
+ <button class="button" type="submit">Fork repository</button>
+ </form>
+ % end
+</section>
diff --git a/templates/new_repo.tpl b/templates/new_repo.tpl
new file mode 100644
index 0000000..118bce8
--- /dev/null
+++ b/templates/new_repo.tpl
@@ -0,0 +1,18 @@
+% rebase("base.tpl", title="New repository", user=user, error=error, notice=notice)
+
+<section class="auth-card wide">
+ <h1>New repository</h1>
+ <form method="post">
+ {{!csrf_field()}}
+ <label>
+ Repository name
+ <input name="name" value="{{name if defined('name') else ''}}" required pattern="[a-z0-9][a-z0-9._-]{1,62}">
+ </label>
+ <label>
+ Description
+ <textarea name="description" rows="3">{{description if defined("description") else ""}}</textarea>
+ </label>
+ <p class="muted">Repositories are public to clone and browse. You can add contributors after creation.</p>
+ <button class="button" type="submit">Create repository</button>
+ </form>
+</section>
diff --git a/templates/profile.tpl b/templates/profile.tpl
new file mode 100644
index 0000000..65065bf
--- /dev/null
+++ b/templates/profile.tpl
@@ -0,0 +1,45 @@
+% profile_name = profile_user["display_name"] or profile_user["username"]
+% rebase("base.tpl", title=profile_name, user=user, error=error, notice=notice)
+
+<section class="profile-header">
+ <h1>{{profile_name}}</h1>
+ % if profile_user["display_name"]:
+ <p class="muted">@{{profile_user["username"]}}</p>
+ % end
+ % if profile_user["bio"]:
+ <p>{{!render_markdown_links(profile_user["bio"])}}</p>
+ % else:
+ <p class="empty">No bio yet.</p>
+ % end
+ <p class="muted">Joined {{profile_user["created_at"]}}</p>
+ % if profile_user["website"]:
+ <p><a href="{{profile_user['website']}}" rel="nofollow">{{profile_user["website"]}}</a></p>
+ % end
+ % if is_self:
+ <p><a class="button small" href="/settings/profile">Edit profile</a></p>
+ % end
+</section>
+
+<section class="panel">
+ <div class="panel-heading">
+ <h2>Repositories</h2>
+ <nav class="tabs">
+ <a class="{{'active' if active_tab == 'owned' else ''}}" href="/{{profile_user['username']}}">Owned ({{len(owned_repos)}})</a>
+ <a class="{{'active' if active_tab == 'stars' else ''}}" href="/{{profile_user['username']}}?tab=stars">Starred ({{len(starred_repos)}})</a>
+ </nav>
+ </div>
+ % if repos:
+ % for repo in repos:
+ <a href="/{{repo['owner_username']}}/{{repo['name']}}">
+ <strong>{{repo["owner_username"]}}/{{repo["name"]}}</strong></a>
+ <br>
+ {{!render_markdown_links(repo["description"]) or "No description yet."}}
+ <br>
+ <small>Updated {{repo["updated_at"]}} · {{repo["star_count"]}} stars</small>
+ <br>
+ <br>
+ % end
+ % else:
+ <p class="empty">{{"No starred repositories yet." if active_tab == "stars" else "No repositories yet."}}</p>
+ % end
+</section>
diff --git a/templates/pull_request_detail.tpl b/templates/pull_request_detail.tpl
new file mode 100644
index 0000000..8fddd93
--- /dev/null
+++ b/templates/pull_request_detail.tpl
@@ -0,0 +1,85 @@
+% rebase("base.tpl", title="#" + str(pr["number"]) + " " + pr["title"], user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ <div class="panel-heading">
+ <div>
+ <h2>(#{{pr["number"]}}) {{pr["title"]}}</h2>
+ <p class="muted"><strong>{{pr["status"]}}</strong> <small>(created by <a href="/{{pr["author_username"]}}">{{pr["author_username"]}}</a> on {{pr["created_at"]}})</small></p>
+ <p class="muted"><a href="/{{pr["source_owner_username"]}}/{{pr["source_repo_name"]}}">{{pr["source_owner_username"]}}/{{pr["source_repo_name"]}}</a> into <a href="/{{pr["target_owner_username"]}}/{{pr["target_repo_name"]}}">{{pr["target_owner_username"]}}/{{pr["target_repo_name"]}}</a></p>
+ <p class="muted">{{format_ref_label(pr["source_ref_type"], pr["source_ref_name"])}} into {{format_ref_label(pr["target_ref_type"], pr["target_ref_name"])}}</p>
+ </div>
+ % if can_maintain and pr["status"] == "open":
+ <div class="filters">
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="merge">
+ <button class="button" type="submit">Merge</button>
+ </form>
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="close">
+ <button class="button secondary small" type="submit">Close</button>
+ </form>
+ </div>
+ % end
+ </div>
+ % if pr["body"]:
+ <pre class="readme">{{pr["body"]}}</pre>
+ % else:
+ <p class="empty">No description.</p>
+ % end
+ % if pr["status"] == "merged":
+ <p class="notice">Merged by {{pr["merged_by_username"] or "unknown"}} on {{pr["merged_at"]}} as <code>{{pr["merge_node"]}}</code>.</p>
+ % end
+</section>
+
+<section class="panel">
+ <h2>Comments</h2>
+ % if comments:
+ <div class="comment-list">
+ % for comment in comments:
+ <article class="comment">
+ <p><strong><a href="/{{comment["author_username"]}}">@{{comment["author_username"]}}</a>:</strong> {{!render_markdown_links(comment["body"])}} <small class="muted">{{comment["created_at"]}}</small></p>
+ </article>
+ % end
+ </div>
+ % else:
+ <p class="empty">No comments yet.</p>
+ % end
+
+ % if user:
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="comment">
+ <label>
+ Add a comment
+ <textarea name="body" rows="5">{{comment_value}}</textarea>
+ </label>
+ <button class="button" type="submit">Comment</button>
+ </form>
+ % else:
+ <p><a href="/login?next=/{{repo['owner_username']}}/{{repo['name']}}/pulls/{{pr['number']}}">Log in to comment</a></p>
+ % end
+</section>
+
+<section class="panel">
+ <h2>Diff</h2>
+ <p class="muted">Base <code>{{pr["base_node"]}}</code> to {{format_ref_label(pr["source_ref_type"], pr["source_ref_name"])}} <code>{{current_source_node}}</code></p>
+ % if diff_error:
+ <p class="alert">{{diff_error}}</p>
+ % elif diff:
+ <pre class="diff"><code class="language-diff">{{diff}}</code></pre>
+ % else:
+ <p class="empty">No diff for this pull request.</p>
+ % end
+</section>
diff --git a/templates/pull_requests.tpl b/templates/pull_requests.tpl
new file mode 100644
index 0000000..485f2f8
--- /dev/null
+++ b/templates/pull_requests.tpl
@@ -0,0 +1,40 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"] + " pull requests", user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ <div class="panel-heading">
+ <h2>Pull requests</h2>
+ <div class="filters">
+ <a class="{{'active' if status == 'open' else ''}}" href="?status=open">Open ({{counts["open"]}})</a>
+ <a class="{{'active' if status == 'merged' else ''}}" href="?status=merged">Merged ({{counts["merged"]}})</a>
+ <a class="{{'active' if status == 'closed' else ''}}" href="?status=closed">Closed ({{counts["closed"]}})</a>
+ <a class="{{'active' if status == 'all' else ''}}" href="?status=all">All</a>
+ % if user:
+ <a href="/{{repo['owner_username']}}/{{repo['name']}}/pulls/new">New pull request</a>
+ % else:
+ <a href="/login?next=/{{repo['owner_username']}}/{{repo['name']}}/pulls/new">New pull request</a>
+ % end
+ </div>
+ </div>
+ % if pull_requests:
+ <ul class="issue-list">
+ % for pr in pull_requests:
+ <li>
+ <a href="/{{repo['owner_username']}}/{{repo['name']}}/pulls/{{pr['number']}}">#{{pr["number"]}} {{pr["title"]}}</a>
+ <span>({{pr["status"]}} from {{pr["source_owner_username"]}}/{{pr["source_repo_name"]}} {{format_ref_label(pr["source_ref_type"], pr["source_ref_name"])}} into {{format_ref_label(pr["target_ref_type"], pr["target_ref_name"])}})</span>
+ </li>
+ % end
+ </ul>
+ % else:
+ <p class="empty">No {{status}} pull requests.</p>
+ % end
+</section>
diff --git a/templates/ref_selector.tpl b/templates/ref_selector.tpl
new file mode 100644
index 0000000..8966ff4
--- /dev/null
+++ b/templates/ref_selector.tpl
@@ -0,0 +1,40 @@
+% selected_ref = get("selected_ref", None)
+% ref_options = get("ref_options", [])
+% selected_ref_value = get("selected_ref_value", "")
+% selected_ref_label = get("selected_ref_label", ref_option_label(selected_ref) if selected_ref else "")
+% active_tab = get("repo_active_tab", "")
+% show_ref_picker = get("show_ref_picker", False)
+% if show_ref_picker and selected_ref and ref_options:
+ <div class="ref-picker" data-ref-picker>
+ <button class="ref-picker-toggle" type="button" aria-haspopup="true" aria-expanded="false">
+ <span>{{selected_ref_label}}</span>
+ </button>
+ <div class="ref-picker-menu" data-ref-picker-menu hidden>
+ <input class="ref-picker-search" type="search" placeholder="Find a ref..." aria-label="Find a ref" autocomplete="off" data-ref-picker-search>
+ <div class="ref-picker-options" role="menu">
+ % for option in ref_options:
+ % is_selected = option["value"] == selected_ref_value
+ <a
+ class="ref-picker-option {{'active' if is_selected else ''}}"
+ href="{{current_url_with_ref(option['ref'])}}"
+ data-ref-picker-option
+ data-ref-label="{{option['label'].lower()}}"
+ data-ref-initial="{{'true' if option.get('is_initial') or is_selected else 'false'}}"
+ role="menuitem"
+ aria-current="{{'page' if is_selected else 'false'}}"
+ >
+ <span>{{option["label"]}}</span>
+ % if is_selected:
+ <span class="ref-picker-current">current</span>
+ % end
+ </a>
+ % end
+ <div class="ref-picker-empty" data-ref-picker-empty hidden>No refs found</div>
+ </div>
+ <div class="ref-picker-footer">
+ <a class="ref-picker-link {{'active' if active_tab == 'tags' else ''}}" href="/{{repo['owner_username']}}/{{repo['name']}}/tags">Tags</a>
+ <a class="ref-picker-link {{'active' if active_tab == 'branches' else ''}}" href="/{{repo['owner_username']}}/{{repo['name']}}/branches">Branches</a>
+ </div>
+ </div>
+ </div>
+% end
diff --git a/templates/repo.tpl b/templates/repo.tpl
new file mode 100644
index 0000000..fb5ff61
--- /dev/null
+++ b/templates/repo.tpl
@@ -0,0 +1,53 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"], user=user, error=error, notice=notice)
+
+<section class="repo-header">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ <p>{{!render_markdown_links(repo["description"]) or "No description yet."}}</p>
+ </div>
+ <div class="clone-box">
+ <code>$ git clone {{clone_url}}</code>
+ </div>
+</section>
+
+<section class="panel">
+ % if readme_html is not None:
+ % if readme_truncated:
+ <p class="notice">README preview truncated. Use the source or raw view for the full file.</p>
+ % end
+ <div class="readme markdown-body">{{!readme_html}}</div>
+ % elif readme is not None:
+ % if readme_truncated:
+ <p class="notice">README preview truncated. Use the source or raw view for the full file.</p>
+ % end
+ <pre class="readme">{{readme}}</pre>
+ % else:
+ <div class="empty">
+ % if commit_count == 0:
+ <p>This repository is empty.</p>
+ <h3>Start with a fresh checkout</h3>
+ <pre>git clone {{clone_url}}
+cd {{repo["name"]}}
+echo "# {{repo["name"]}}" > README.md
+git add README.md
+git commit -m "Initial commit"
+git push -u origin main</pre>
+ <h3>Push an existing local Git repo</h3>
+ <pre>cd /path/to/existing-repo
+git remote add origin {{clone_url}}
+git push -u origin HEAD:main</pre>
+ <p class="muted">Push will ask for the repository owner's or a contributor's username and password.</p>
+ % else:
+ <p>This repository has no README.</p>
+ <pre>echo "# {{repo["name"]}}" > README.md
+git add README.md
+git commit -m "Add README"
+git push</pre>
+ % end
+ </div>
+ % end
+</section>
diff --git a/templates/repo_fork_eyebrow.tpl b/templates/repo_fork_eyebrow.tpl
new file mode 100644
index 0000000..0d7c97d
--- /dev/null
+++ b/templates/repo_fork_eyebrow.tpl
@@ -0,0 +1,4 @@
+% source_repo = get("source_repo", None)
+% if source_repo:
+ <p class="eyebrow">Fork of <a href="/{{source_repo['owner_username']}}/{{source_repo['name']}}">{{source_repo["owner_username"]}}/{{source_repo["name"]}}</a>.</p>
+% end
diff --git a/templates/repo_nav.tpl b/templates/repo_nav.tpl
new file mode 100644
index 0000000..b4946fd
--- /dev/null
+++ b/templates/repo_nav.tpl
@@ -0,0 +1,36 @@
+% active_tab = get("repo_active_tab", "")
+% selected_ref = get("selected_ref", None)
+<nav class="repo-tabs">
+ <a class="repo-tab {{'active' if active_tab == 'overview' else ''}}" href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'], selected_ref)}}">Overview</a>
+ <a class="repo-tab {{'active' if active_tab == 'source' else ''}}" href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src', selected_ref)}}">Source</a>
+ <a class="repo-tab {{'active' if active_tab == 'commits' else ''}}" href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/commits', selected_ref)}}">Commits{{" (" + str(commit_count) + ")" if commit_count else ""}}</a>
+ <a class="repo-tab {{'active' if active_tab == 'issues' else ''}}" href="/{{repo['owner_username']}}/{{repo['name']}}/issues">Issues{{" (" + str(issue_counts["open"]) + ")" if issue_counts["open"] else ""}}</a>
+ <a class="repo-tab {{'active' if active_tab == 'pulls' else ''}}" href="/{{repo['owner_username']}}/{{repo['name']}}/pulls">Pull requests{{" (" + str(pr_counts["open"]) + ")" if pr_counts["open"] else ""}}</a>
+ % if user:
+ <form class="inline-form" method="post" action="/{{repo['owner_username']}}/{{repo['name']}}/star">
+ {{!csrf_field()}}
+ % if is_starred:
+ <input type="hidden" name="action" value="unstar">
+ <button class="button-link" type="submit">Unstar ({{star_count}})</button>
+ % else:
+ <input type="hidden" name="action" value="star">
+ <button class="button-link" type="submit">Star ({{star_count}})</button>
+ % end
+ </form>
+ % else:
+ <a class="button-link" href="/login?next=/{{repo['owner_username']}}/{{repo['name']}}">Star ({{star_count}})</a>
+ % end
+ % if user and not is_owner and not has_fork:
+ <form class="inline-form" method="post" action="/{{repo['owner_username']}}/{{repo['name']}}/fork">
+ {{!csrf_field()}}
+ <input type="hidden" name="name" value="{{repo['name']}}">
+ <input type="hidden" name="description" value="{{repo['description']}}">
+ <button class="button-link" type="submit">Fork</button>
+ </form>
+ % elif not user:
+ <a href="/login?next=/{{repo['owner_username']}}/{{repo['name']}}/fork">Fork</a>
+ % end
+ % if is_owner:
+ <a class="repo-tab {{'active' if active_tab == 'settings' else ''}}" href="/{{repo['owner_username']}}/{{repo['name']}}/settings">Settings</a>
+ % end
+</nav>
diff --git a/templates/repo_settings.tpl b/templates/repo_settings.tpl
new file mode 100644
index 0000000..35f38a3
--- /dev/null
+++ b/templates/repo_settings.tpl
@@ -0,0 +1,68 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"] + " settings", user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ </div>
+</section>
+
+<section class="panel">
+ <h2>Description</h2>
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="save">
+ <label>
+ Repository description
+ <input name="description" value="{{repo['description']}}" maxlength="500" placeholder="Repository description">
+ </label>
+ <button class="button" type="submit">Save description</button>
+ </form>
+</section>
+
+<section class="panel">
+ <h2>Contributors</h2>
+ % if contributors:
+ <ul class="clean-list">
+ % for contributor in contributors:
+ <li class="panel-heading">
+ <span><a href="/{{contributor['username']}}">{{contributor["username"]}}</a> <span class="muted">added {{contributor["contributor_since"]}}</span></span>
+ <form class="inline-form" method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="remove_contributor">
+ <input type="hidden" name="user_id" value="{{contributor['id']}}">
+ <button class="button secondary small" type="submit">Remove</button>
+ </form>
+ </li>
+ % end
+ </ul>
+ % else:
+ <p class="empty">No contributors yet.</p>
+ % end
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="add_contributor">
+ <label>
+ Username
+ <input name="username" value="{{contributor_username}}" autocomplete="off" placeholder="username">
+ </label>
+ <button class="button" type="submit">Add contributor</button>
+ </form>
+</section>
+
+<section class="panel danger-zone">
+ <h2>Delete repository</h2>
+ <p>This permanently deletes the repository, its issues, pull requests, and Git data.</p>
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="action" value="delete">
+ <label>
+ Type {{repo["name"]}} to confirm
+ <input name="confirm_name" autocomplete="off">
+ </label>
+ <button class="button danger" type="submit">Delete repository</button>
+ </form>
+</section>
diff --git a/templates/repo_title.tpl b/templates/repo_title.tpl
new file mode 100644
index 0000000..1fd7422
--- /dev/null
+++ b/templates/repo_title.tpl
@@ -0,0 +1,4 @@
+<div class="repo-title-row">
+ <h1><a href="/{{repo['owner_username']}}">{{repo["owner_username"]}}</a>/{{repo["name"]}}</h1>
+ % include("ref_selector.tpl", repo=repo)
+</div>
diff --git a/templates/signup.tpl b/templates/signup.tpl
new file mode 100644
index 0000000..11636ba
--- /dev/null
+++ b/templates/signup.tpl
@@ -0,0 +1,19 @@
+% rebase("base.tpl", title="Sign up", user=user, error=error, notice=notice)
+
+<section class="auth-card">
+ <h1>Create account</h1>
+ <form method="post">
+ {{!csrf_field()}}
+ <input type="hidden" name="next" value="{{next_url}}">
+ <label>
+ Username
+ <input name="username" value="{{username if defined('username') else ''}}" required autocomplete="username" pattern="[a-z0-9][a-z0-9._-]{1,62}">
+ </label>
+ <label>
+ Password
+ <input name="password" type="password" required autocomplete="new-password" minlength="8">
+ </label>
+ <button class="button" type="submit">Sign up</button>
+ </form>
+ <p class="muted">Already have an account? <a href="/login">Log in</a>.</p>
+</section>
diff --git a/templates/source.tpl b/templates/source.tpl
new file mode 100644
index 0000000..8f41832
--- /dev/null
+++ b/templates/source.tpl
@@ -0,0 +1,41 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"] + " source", user=user, error=error, notice=notice)
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ <div class="breadcrumb">
+ <a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src', selected_ref)}}">root</a>
+ % if current_path:
+ % parts = current_path.split("/")
+ % running = ""
+ % for part in parts:
+ % running = part if not running else running + "/" + part
+ <span>/</span><a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src/' + quote_path(running), selected_ref)}}">{{part}}</a>
+ % end
+ % end
+ </div>
+ </div>
+</section>
+
+<section class="panel">
+ % if entries:
+ <ul class="file-list">
+ % for entry in entries:
+ <li>
+ <code>{{"dir" if entry["type"] == "dir" else "file"}}</code>
+ <div>
+ <a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src/' + quote_path(entry['path']), selected_ref)}}">
+ <strong>{{entry["name"]}}{{"/" if entry["type"] == "dir" else ""}}</strong>
+ </a>
+ </div>
+ </li>
+ % end
+ </ul>
+ % else:
+ <p class="empty">No files here.</p>
+ % end
+</section>
diff --git a/templates/tags.tpl b/templates/tags.tpl
new file mode 100644
index 0000000..2849541
--- /dev/null
+++ b/templates/tags.tpl
@@ -0,0 +1,36 @@
+% rebase("base.tpl", title=repo["owner_username"] + "/" + repo["name"] + " tags", user=user, error=error, notice=notice)
+
+
+<section class="repo-header slim">
+ <div>
+ % include("repo_fork_eyebrow.tpl")
+ % include("repo_title.tpl", repo=repo)
+
+ % include("repo_nav.tpl", repo=repo, commit_count=commit_count, issue_counts=issue_counts, pr_counts=pr_counts, star_count=star_count, is_starred=is_starred, is_owner=is_owner, can_maintain=can_maintain)
+
+ <p class="muted">Tags</p>
+ </div>
+</section>
+
+<section class="panel">
+ % if tags:
+ <ul class="commit-list">
+ % for tag in tags:
+ <li>
+ <code>{{tag["name"]}}</code>
+ <div style="margin-bottom:20px;">
+ <strong><a href="/{{repo['owner_username']}}/{{repo['name']}}/commits/{{tag['short_node']}}">{{tag["short_node"]}}</a></strong>
+ <small>commit {{tag["short_node"]}} · {{tag["date"]}}</small>
+ <p>{{tag["summary"]}}
+ <small><a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/src', tag, True)}}">Browse code</a> ·
+ <a href="{{url_with_ref('/' + repo['owner_username'] + '/' + repo['name'] + '/commits', tag, True)}}">Commits</a> ·
+ <a href="/{{repo['owner_username']}}/{{repo['name']}}/archive/{{tag['short_node']}}.zip">Archive</a></small>
+ </p>
+ </div>
+ </li>
+ % end
+ </ul>
+ % else:
+ <p class="empty">No tags yet.</p>
+ % end
+</section>
diff --git a/tests/test_app.py b/tests/test_app.py
new file mode 100644
index 0000000..6235833
--- /dev/null
+++ b/tests/test_app.py
@@ -0,0 +1,1281 @@
+import atexit
+import base64
+from http.cookies import SimpleCookie
+from io import BytesIO, StringIO
+import os
+import re
+import shutil
+import subprocess
+import sys
+import tempfile
+from pathlib import Path
+from urllib.parse import urlencode, urlsplit, unquote
+
+import pytest
+from bottle import HTTPError
+
+
+REPO_ROOT_FOR_IMPORTS = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(REPO_ROOT_FOR_IMPORTS))
+BOOTSTRAP_DIR = Path(tempfile.mkdtemp(prefix="gitman-test-bootstrap-"))
+atexit.register(shutil.rmtree, BOOTSTRAP_DIR, ignore_errors=True)
+os.environ["GITMAN_DB"] = str(BOOTSTRAP_DIR / "gitman.sqlite3")
+os.environ["GITMAN_REPO_ROOT"] = str(BOOTSTRAP_DIR / "repos")
+os.environ["SECRET_KEY"] = "test-secret"
+
+import app as gitman # noqa: E402
+
+
+class WsgiResponse:
+ def __init__(self, status_line, headers, body):
+ self.status_line = status_line
+ self.status_code = int(status_line.split(" ", 1)[0])
+ self.headers = headers
+ self.body = body
+ self.text = body.decode("utf-8", "replace")
+
+ def header(self, name, default=None):
+ name = name.lower()
+ for key, value in self.headers:
+ if key.lower() == name:
+ return value
+ return default
+
+ @property
+ def location(self):
+ return self.header("Location")
+
+ @property
+ def location_path(self):
+ location = self.location
+ if not location:
+ return None
+ split = urlsplit(location)
+ if not split.scheme and not split.netloc:
+ return location
+ return split.path + (f"?{split.query}" if split.query else "")
+
+
+class WsgiClient:
+ def __init__(self, wsgi_app):
+ self.wsgi_app = wsgi_app
+ self.cookies = {}
+ self.csrf_token = None
+
+ def get(self, path, headers=None):
+ return self.request("GET", path, headers=headers)
+
+ def post(self, path, data=None, headers=None):
+ return self.request("POST", path, data=data, headers=headers)
+
+ def request(self, method, path, data=None, headers=None):
+ headers = headers or {}
+ split = urlsplit(path)
+ body = b""
+ if method == "POST" and data is None and not split.path.startswith("/git/") and self.csrf_token:
+ data = {}
+ if data is not None:
+ if (
+ method == "POST"
+ and not split.path.startswith("/git/")
+ and gitman.CSRF_FORM_FIELD not in data
+ and self.csrf_token
+ ):
+ data = {**data, gitman.CSRF_FORM_FIELD: self.csrf_token}
+ body = urlencode(data, doseq=True).encode("utf-8")
+ headers = {"Content-Type": "application/x-www-form-urlencoded", **headers}
+ environ = {
+ "REQUEST_METHOD": method,
+ "SCRIPT_NAME": "",
+ "PATH_INFO": unquote(split.path),
+ "QUERY_STRING": split.query,
+ "SERVER_NAME": "example.test",
+ "SERVER_PORT": "80",
+ "SERVER_PROTOCOL": "HTTP/1.1",
+ "HTTP_HOST": "example.test",
+ "wsgi.version": (1, 0),
+ "wsgi.url_scheme": "http",
+ "wsgi.input": BytesIO(body),
+ "wsgi.errors": StringIO(),
+ "wsgi.multithread": False,
+ "wsgi.multiprocess": False,
+ "wsgi.run_once": False,
+ "CONTENT_LENGTH": str(len(body)),
+ "REMOTE_ADDR": "127.0.0.1",
+ }
+ if self.cookies:
+ environ["HTTP_COOKIE"] = "; ".join(f"{key}={value}" for key, value in self.cookies.items())
+ for key, value in headers.items():
+ env_key = key.upper().replace("-", "_")
+ if env_key == "CONTENT_TYPE":
+ environ["CONTENT_TYPE"] = value
+ elif env_key == "CONTENT_LENGTH":
+ environ["CONTENT_LENGTH"] = value
+ else:
+ environ[f"HTTP_{env_key}"] = value
+
+ captured = {}
+
+ def start_response(status, response_headers, exc_info=None):
+ captured["status"] = status
+ captured["headers"] = response_headers
+
+ body_iter = self.wsgi_app(environ, start_response)
+ try:
+ response_body = b"".join(
+ chunk if isinstance(chunk, bytes) else chunk.encode("utf-8") for chunk in body_iter
+ )
+ finally:
+ close = getattr(body_iter, "close", None)
+ if close:
+ close()
+ response = WsgiResponse(captured["status"], captured["headers"], response_body)
+ self._store_cookies(response.headers)
+ self._store_csrf_token(response.text)
+ return response
+
+ def _store_cookies(self, headers):
+ for key, value in headers:
+ if key.lower() != "set-cookie":
+ continue
+ cookie = SimpleCookie()
+ cookie.load(value)
+ for name, morsel in cookie.items():
+ if morsel.value == "" and (morsel["expires"] or morsel["max-age"] == "0"):
+ self.cookies.pop(name, None)
+ else:
+ self.cookies[name] = morsel.value
+
+ def _store_csrf_token(self, text):
+ match = re.search(r'name="_csrf_token"\s+value="([^"]+)"', text)
+ if match:
+ self.csrf_token = match.group(1)
+
+
+def login_client(client, username, password="correct horse battery staple", next_url="/"):
+ client.get("/login")
+ return client.post("/login", {"username": username, "password": password, "next": next_url})
+
+
[email protected]()
+def isolated_app(tmp_path, monkeypatch):
+ monkeypatch.setattr(gitman, "DB_PATH", tmp_path / "gitman.sqlite3")
+ monkeypatch.setattr(gitman, "REPO_ROOT", tmp_path / "repos")
+ gitman.AUTH_FAILURES.clear()
+ gitman.init_db()
+ return gitman
+
+
+def create_user(username, password="correct horse battery staple"):
+ with gitman.db_connect() as conn:
+ conn.execute(
+ "INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
+ (username, gitman.hash_password(password), gitman.utcnow()),
+ )
+ return gitman.get_user_by_username(username)
+
+
+def commit_file(repo_path, relative_path, content, message="initial commit", user="alice", branch=None):
+ with tempfile.TemporaryDirectory(prefix="gitman-test-work-") as tempdir:
+ work_path = Path(tempdir) / "work"
+ gitman.run_git(["clone", str(repo_path), str(work_path)], timeout=60)
+ gitman.run_git(["config", "user.name", user], cwd=work_path)
+ gitman.run_git(["config", "user.email", "[email protected]"], cwd=work_path)
+ if branch:
+ checkout = gitman.run_git(["checkout", branch], cwd=work_path, check=False)
+ if checkout.returncode != 0:
+ gitman.run_git(["checkout", "-b", branch], cwd=work_path)
+ target = work_path / relative_path
+ target.parent.mkdir(parents=True, exist_ok=True)
+ target.write_text(content, encoding="utf-8")
+ gitman.run_git(["add", relative_path], cwd=work_path)
+ gitman.run_git(["commit", "-m", message], cwd=work_path)
+ node = gitman.run_git(["rev-parse", "HEAD"], cwd=work_path).stdout.strip()
+ current_branch = gitman.run_git(["branch", "--show-current"], cwd=work_path).stdout.strip() or "main"
+ gitman.run_git(["push", "origin", f"HEAD:refs/heads/{current_branch}"], cwd=work_path, timeout=60)
+ return node
+
+
+def basic_auth(username, password):
+ token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
+ return {"Authorization": f"Basic {token}"}
+
+
+def create_repo_with_refs(owner):
+ gitman.create_repository(owner, "demo", "Demo repository")
+ path = gitman.repo_path(owner["username"], "demo")
+ default_node = commit_file(path, "README.md", "# Demo\n", message="initial", user=owner["username"])
+
+ feature_node = commit_file(
+ path,
+ "feature.txt",
+ "feature work\n",
+ message="add feature",
+ user=owner["username"],
+ branch="feature",
+ )
+ gitman.run_git(["tag", "v1.0", feature_node], cwd=path)
+
+ old_node = commit_file(
+ path,
+ "old.txt",
+ "old branch\n",
+ message="old branch work",
+ user=owner["username"],
+ branch="old",
+ )
+
+ return {
+ "path": path,
+ "default_node": default_node,
+ "feature_node": feature_node,
+ "old_node": old_node,
+ }
+
+
+def test_normalize_slug_accepts_trimmed_lowercase_values():
+ assert gitman.normalize_slug(" Demo_Repo-1 ", "Repository name") == "demo_repo-1"
+ assert gitman.normalize_slug("My.Name", "Repository name") == "my.name"
+
+
[email protected](
+ ("value", "label"),
+ [
+ ("x", "Repository name"),
+ ("bad/name", "Repository name"),
+ ("-bad", "Repository name"),
+ ("demo.git", "Repository name"),
+ ("login", "Username"),
+ ("harrisonerd", "Username"),
+ ],
+)
+def test_normalize_slug_rejects_invalid_or_reserved_values(value, label):
+ with pytest.raises(ValueError):
+ gitman.normalize_slug(value, label)
+
+
+def test_clean_repo_path_normalizes_and_rejects_traversal():
+ assert gitman.clean_repo_path("/docs/readme.md/") == "docs/readme.md"
+ assert gitman.clean_repo_path("") == ""
+
+ for value in ("../secret", "docs/../secret", "./file"):
+ with pytest.raises(HTTPError) as exc_info:
+ gitman.clean_repo_path(value)
+ assert exc_info.value.status_code == 400
+
+
+def test_password_hashes_verify_and_reject_bad_inputs():
+ stored = gitman.hash_password("correct-password")
+
+ assert stored.startswith("pbkdf2_sha256$")
+ assert gitman.verify_password("correct-password", stored)
+ assert not gitman.verify_password("wrong-password", stored)
+ assert not gitman.verify_password("correct-password", "not-a-valid-hash")
+
+
[email protected](
+ ("value", "expected"),
+ [
+ ("example.com", "https://example.com"),
+ (" http://example.com/path ", "http://example.com/path"),
+ ("https://example.com", "https://example.com"),
+ ],
+)
+def test_normalize_website_accepts_http_urls_and_adds_scheme(value, expected):
+ assert gitman.normalize_website(value) == expected
+
+
[email protected]("value", ["ftp://example.com", "https://", "not a url"])
+def test_normalize_website_rejects_invalid_urls(value):
+ with pytest.raises(ValueError):
+ gitman.normalize_website(value)
+
+
+def test_render_markdown_strips_scripts_and_unsafe_links():
+ rendered = gitman.render_markdown(
+ """
+# Title
+
+<script>alert("x")</script>
+
+[safe](https://example.com) [unsafe](javascript:alert(1))
+
+| A |
+| - |
+| B |
+"""
+ )
+
+ assert "<h1>Title</h1>" in rendered
+ assert "script" not in rendered.lower()
+ assert "javascript:" not in rendered.lower()
+ assert 'href="https://example.com"' in rendered
+ assert "<table>" in rendered
+
+
+def test_render_markdown_links_allows_only_links():
+ rendered = gitman.render_markdown_links("[site](https://example.com) **bold** <strong>x</strong>")
+
+ assert rendered == '<a href="https://example.com">site</a> bold <strong>x</strong>'
+
+
+def test_startup_config_rejects_default_secret_outside_debug(monkeypatch):
+ monkeypatch.setattr(gitman, "DEBUG", False)
+ monkeypatch.setattr(gitman, "SECRET_KEY", gitman.DEFAULT_SECRET_KEY)
+
+ with pytest.raises(RuntimeError):
+ gitman.validate_startup_config()
+
+ monkeypatch.setattr(gitman, "SECRET_KEY", "production-secret")
+ gitman.validate_startup_config()
+
+
+def test_security_headers_and_secure_cookie_flags(isolated_app):
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/login", headers={"X-Forwarded-Proto": "https"})
+ assert response.status_code == 200
+ assert response.header("X-Content-Type-Options") == "nosniff"
+ assert response.header("Referrer-Policy") == "same-origin"
+ assert response.header("X-Frame-Options") == "DENY"
+ assert "frame-ancestors 'none'" in response.header("Content-Security-Policy")
+ csrf_cookie = response.header("Set-Cookie")
+ assert "csrf_token=" in csrf_cookie
+ assert "HttpOnly" in csrf_cookie
+ assert "samesite=lax" in csrf_cookie.lower()
+ assert "Secure" in csrf_cookie
+
+ create_user("alice")
+ response = client.post(
+ "/login",
+ {"username": "alice", "password": "correct horse battery staple", "next": "/"},
+ headers={"X-Forwarded-Proto": "https"},
+ )
+ assert response.status_code == 303
+ session_cookie = response.header("Set-Cookie")
+ assert "session=" in session_cookie
+ assert "HttpOnly" in session_cookie
+ assert "samesite=lax" in session_cookie.lower()
+ assert "Secure" in session_cookie
+
+
+def test_styles_support_system_dark_mode(isolated_app):
+ css = (REPO_ROOT_FOR_IMPORTS / "static" / "styles.css").read_text(encoding="utf-8")
+ client = WsgiClient(isolated_app.app)
+ response = client.get("/")
+
+ assert "@media (prefers-color-scheme: dark)" in css
+ assert "color-scheme: dark" in css
+ assert "var(--page-bg)" in css
+ assert "github-dark.min.css" in response.text
+ assert 'media="(prefers-color-scheme: dark)"' in response.text
+
+
+def test_csrf_required_for_browser_posts_and_git_is_exempt(isolated_app):
+ owner = create_user("alice", password="owner-password")
+ isolated_app.create_repository(owner, "demo", "")
+ client = WsgiClient(isolated_app.app)
+
+ response = client.post("/login", {"username": "alice", "password": "owner-password", "next": "/"})
+ assert response.status_code == 403
+ assert "Invalid CSRF token." in response.text
+
+ client.get("/login")
+ response = client.post(
+ "/login",
+ {"username": "alice", "password": "owner-password", "next": "/", gitman.CSRF_FORM_FIELD: "bad-token"},
+ )
+ assert response.status_code == 403
+
+ response = client.post("/git/alice/demo/git-receive-pack")
+ assert response.status_code == 401
+ assert response.header("WWW-Authenticate") == 'Basic realm="GitMan"'
+ assert response.header("Connection") is None
+
+
+def test_browser_form_size_limit(isolated_app, monkeypatch):
+ monkeypatch.setattr(gitman, "MAX_FORM_BYTES", 40)
+ client = WsgiClient(isolated_app.app)
+ client.get("/login")
+
+ response = client.post("/login", {"username": "a" * 100, "password": "bad", "next": "/"})
+ assert response.status_code == 413
+ assert "Request body too large." in response.text
+
+
+def test_login_and_git_auth_failures_are_rate_limited(isolated_app, monkeypatch):
+ monkeypatch.setattr(gitman, "RATE_LIMIT_MAX_FAILURES", 2)
+ monkeypatch.setattr(gitman, "RATE_LIMIT_COOLDOWN_SECONDS", 60)
+ create_user("alice", password="owner-password")
+ client = WsgiClient(isolated_app.app)
+ client.get("/login")
+
+ for _ in range(2):
+ response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
+ assert response.status_code == 200
+ response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
+ assert response.status_code == 429
+ assert response.header("Retry-After") == "60"
+
+ gitman.AUTH_FAILURES.clear()
+ owner = gitman.get_user_by_username("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ for _ in range(2):
+ response = client.post("/git/alice/demo/git-receive-pack", headers=basic_auth("alice", "wrong"))
+ assert response.status_code == 401
+ assert response.header("Connection") is None
+ response = client.post("/git/alice/demo/git-receive-pack", headers=basic_auth("alice", "wrong"))
+ assert response.status_code == 429
+ assert response.header("Connection") is None
+
+
+def test_readme_and_file_previews_are_truncated(isolated_app, monkeypatch):
+ monkeypatch.setattr(gitman, "MAX_RENDER_BYTES", 32)
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "A" * 200, message="large readme")
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/alice/demo")
+ assert response.status_code == 200
+ assert "README preview truncated." in response.text
+
+ response = client.get("/alice/demo/src/README.md")
+ assert response.status_code == 200
+ assert "File preview truncated." in response.text
+
+
+def test_build_tree_deduplicates_entries_and_sorts_directories_first():
+ files = ["README.md", "src/app.py", "src/utils/helpers.py", "docs/index.md", "src/z.txt"]
+
+ assert gitman.build_tree(files, "") == [
+ {"name": "docs", "path": "docs", "type": "dir"},
+ {"name": "src", "path": "src", "type": "dir"},
+ {"name": "README.md", "path": "README.md", "type": "file"},
+ ]
+ assert gitman.build_tree(files, "src") == [
+ {"name": "utils", "path": "src/utils", "type": "dir"},
+ {"name": "app.py", "path": "src/app.py", "type": "file"},
+ {"name": "z.txt", "path": "src/z.txt", "type": "file"},
+ ]
+
+
+def test_ref_option_values_round_trip_quoted_names():
+ branch_name = "feature/a|b c"
+ value = gitman.ref_option_value(gitman.REF_TYPE_BRANCH, branch_name)
+
+ assert value == "branch|feature%2Fa%7Cb%20c"
+ assert gitman.parse_ref_option_value(value) == (gitman.REF_TYPE_BRANCH, branch_name)
+
+ tag_name = "release/a|b c"
+ tag_value = gitman.ref_option_value(gitman.REF_TYPE_TAG, tag_name)
+
+ assert tag_value == "tag|release%2Fa%7Cb%20c"
+ assert gitman.parse_ref_option_value(tag_value) == (gitman.REF_TYPE_TAG, tag_name)
+
+
+def test_source_ref_option_values_round_trip_and_validate_source_repo():
+ branch_name = "feature/a|b"
+ value = gitman.source_ref_option_value(42, gitman.REF_TYPE_BRANCH, branch_name)
+
+ assert gitman.parse_source_ref_option_value(value) == (42, gitman.REF_TYPE_BRANCH, branch_name)
+
+ with pytest.raises(ValueError, match="Invalid source repository"):
+ gitman.parse_source_ref_option_value(f"not-int|{gitman.REF_TYPE_BRANCH}|main")
+
+ with pytest.raises(ValueError, match="Invalid source ref"):
+ gitman.parse_source_ref_option_value(f"42|{gitman.REF_TYPE_COMMIT}|abc123")
+
+
+def test_ref_query_and_url_helpers_skip_default_refs_unless_forced():
+ ref = {"type": gitman.REF_TYPE_BRANCH, "name": "main", "is_default": True}
+
+ assert gitman.ref_query_string(ref) == ""
+ assert gitman.ref_query_string(ref, force=True) == "ref_type=branch&ref=main"
+ assert gitman.url_with_ref("/alice/demo", ref, force=True) == "/alice/demo?ref_type=branch&ref=main"
+ assert gitman.url_with_ref("/alice/demo?tab=files", ref, force=True) == (
+ "/alice/demo?tab=files&ref_type=branch&ref=main"
+ )
+
+
+def test_format_ref_label_and_option_label():
+ assert gitman.format_ref_label(gitman.REF_TYPE_TIP) == "HEAD"
+ assert gitman.format_ref_label(gitman.REF_TYPE_TAG, "v1.0") == "tag v1.0"
+ assert gitman.format_ref_label(gitman.REF_TYPE_COMMIT, "abcdef1234567890") == "commit abcdef123456"
+ assert gitman.ref_option_label({"type": gitman.REF_TYPE_BRANCH, "name": "old", "closed": False}) == "branch old"
+ assert gitman.ref_option_label({"type": gitman.REF_TYPE_TAG, "name": "v1.0", "local": False}) == "tag v1.0"
+
+
+def test_init_db_creates_expected_tables_and_is_idempotent(isolated_app):
+ isolated_app.init_db()
+
+ with isolated_app.db_connect() as conn:
+ tables = {
+ row["name"]
+ for row in conn.execute("SELECT name FROM sqlite_master WHERE type = 'table'")
+ }
+ user_columns = {row["name"] for row in conn.execute("PRAGMA table_info(users)")}
+ repo_columns = {row["name"] for row in conn.execute("PRAGMA table_info(repositories)")}
+ pr_columns = {row["name"] for row in conn.execute("PRAGMA table_info(pull_requests)")}
+
+ assert {"users", "repositories", "issues", "pull_requests", "repo_stars"}.issubset(tables)
+ assert {"display_name", "bio", "website"}.issubset(user_columns)
+ assert {"forked_from_repo_id", "forked_at", "forked_from_node"}.issubset(repo_columns)
+ assert {"target_ref_type", "target_ref_name", "source_ref_type", "source_ref_name"}.issubset(pr_columns)
+
+
+def test_db_connect_configures_sqlite_for_worker_contention(isolated_app):
+ isolated_app.init_db()
+
+ with isolated_app.db_connect() as conn:
+ foreign_keys = conn.execute("PRAGMA foreign_keys").fetchone()[0]
+ busy_timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
+ synchronous = conn.execute("PRAGMA synchronous").fetchone()[0]
+ journal_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
+
+ assert foreign_keys == 1
+ assert busy_timeout == gitman.SQLITE_BUSY_TIMEOUT_MS
+ assert synchronous == 1
+ assert journal_mode.lower() == "wal"
+
+
+def test_sqlite_accepts_concurrent_worker_writes(isolated_app):
+ isolated_app.init_db()
+ repo_root = Path(__file__).resolve().parents[1]
+ env = os.environ.copy()
+ existing_pythonpath = env.get("PYTHONPATH")
+ env["PYTHONPATH"] = str(repo_root) if not existing_pythonpath else f"{repo_root}{os.pathsep}{existing_pythonpath}"
+
+ worker_script = """
+import os
+import sys
+import time
+
+os.environ["GITMAN_DB"] = sys.argv[1]
+os.environ["GITMAN_REPO_ROOT"] = sys.argv[2]
+os.environ["SECRET_KEY"] = "test-secret"
+
+import app as gitman
+
+with gitman.db_connect() as conn:
+ conn.execute("BEGIN IMMEDIATE")
+ conn.execute(
+ "INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
+ (sys.argv[3], "hash", gitman.utcnow()),
+ )
+ time.sleep(0.2)
+"""
+
+ processes = [
+ subprocess.Popen(
+ [
+ sys.executable,
+ "-c",
+ worker_script,
+ str(isolated_app.DB_PATH),
+ str(isolated_app.REPO_ROOT),
+ f"worker-{index}",
+ ],
+ cwd=repo_root,
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ )
+ for index in range(3)
+ ]
+
+ results = []
+ try:
+ for process in processes:
+ stdout, stderr = process.communicate(timeout=15)
+ results.append((process.returncode, stdout, stderr))
+ finally:
+ for process in processes:
+ if process.poll() is None:
+ process.kill()
+
+ assert results
+ for returncode, stdout, stderr in results:
+ assert returncode == 0, stdout + stderr
+
+ with isolated_app.db_connect() as conn:
+ count = conn.execute("SELECT COUNT(*) FROM users WHERE username LIKE 'worker-%'").fetchone()[0]
+
+ assert count == 3
+
+
+def test_create_repository_persists_metadata_and_writes_git_config(isolated_app):
+ owner = create_user("alice")
+
+ isolated_app.create_repository(owner, "demo", "line one\nline two")
+
+ repo = isolated_app.get_repo("alice", "demo")
+ path = isolated_app.repo_path("alice", "demo")
+
+ assert repo["owner_username"] == "alice"
+ assert repo["name"] == "demo"
+ assert path.joinpath("objects").is_dir()
+ assert path.joinpath("HEAD").read_text(encoding="utf-8").strip() == "ref: refs/heads/main"
+ assert path.joinpath("description").read_text(encoding="utf-8") == "line one line two"
+ assert isolated_app.run_git(["config", "gitman.owner"], cwd=path).stdout.strip() == "alice"
+ assert isolated_app.run_git(["config", "gitman.name"], cwd=path).stdout.strip() == "demo"
+ assert isolated_app.run_git(["config", "gitman.description"], cwd=path).stdout.strip() == "line one line two"
+ assert isolated_app.run_git(["config", "http.receivepack"], cwd=path).stdout.strip() == "true"
+ assert isolated_app.run_git(["config", "receive.denyDeleteCurrent"], cwd=path).stdout.strip() == "warn"
+
+
+def test_create_repository_rolls_back_database_and_files_when_git_init_fails(isolated_app, monkeypatch):
+ owner = create_user("alice")
+
+ def fail_git(*args, **kwargs):
+ raise gitman.GitCommandError("init failed")
+
+ monkeypatch.setattr(isolated_app, "run_git", fail_git)
+
+ with pytest.raises(gitman.GitCommandError, match="init failed"):
+ isolated_app.create_repository(owner, "broken", "")
+
+ assert isolated_app.get_repo("alice", "broken") is None
+ assert not isolated_app.repo_path("alice", "broken").exists()
+
+
+def test_star_and_contributor_helpers_update_database_and_git_config(isolated_app):
+ owner = create_user("alice")
+ contributor = create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ repo = isolated_app.get_repo("alice", "demo")
+
+ isolated_app.star_repo(contributor, repo)
+ assert isolated_app.repo_star_count(repo["id"]) == 1
+ assert isolated_app.user_starred_repo(contributor, repo)
+
+ isolated_app.add_repo_contributor(repo, owner, "bob")
+ assert isolated_app.user_can_maintain_repo(contributor, repo)
+ repo_path = isolated_app.repo_path("alice", "demo")
+ assert isolated_app.run_git(["config", "http.receivepack"], cwd=repo_path).stdout.strip() == "true"
+
+ isolated_app.unstar_repo(contributor, repo)
+ isolated_app.remove_repo_contributor(repo, contributor["id"])
+ assert isolated_app.repo_star_count(repo["id"]) == 0
+ assert not isolated_app.user_starred_repo(contributor, repo)
+ assert not isolated_app.user_can_maintain_repo(contributor, repo)
+
+
+def test_issue_queries_count_filter_and_order_comments(isolated_app):
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ repo = isolated_app.get_repo("alice", "demo")
+ now = isolated_app.utcnow()
+
+ with isolated_app.db_connect() as conn:
+ conn.execute(
+ """
+ INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at)
+ VALUES (?, ?, 1, 'open issue', 'body', 'open', ?, ?)
+ """,
+ (repo["id"], owner["id"], now, now),
+ )
+ conn.execute(
+ """
+ INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at, closed_at)
+ VALUES (?, ?, 2, 'closed issue', '', 'closed', ?, ?, ?)
+ """,
+ (repo["id"], owner["id"], now, now, now),
+ )
+ issue_id = conn.execute("SELECT id FROM issues WHERE number = 1").fetchone()["id"]
+ conn.execute(
+ """
+ INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
+ VALUES (?, ?, 'second', '2026-01-01T00:00:02Z', '2026-01-01T00:00:02Z')
+ """,
+ (issue_id, owner["id"]),
+ )
+ conn.execute(
+ """
+ INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
+ VALUES (?, ?, 'first', '2026-01-01T00:00:01Z', '2026-01-01T00:00:01Z')
+ """,
+ (issue_id, owner["id"]),
+ )
+
+ assert isolated_app.issue_counts(repo["id"]) == {"open": 1, "closed": 1}
+ assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "all")] == [2, 1]
+ assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "invalid")] == [1]
+ assert [comment["body"] for comment in isolated_app.list_issue_comments(issue_id)] == ["first", "second"]
+
+
+def test_git_read_helpers_return_files_readme_commits_and_default_ref(isolated_app):
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ path = isolated_app.repo_path("alice", "demo")
+ node = commit_file(path, "README.md", "# Demo\n", message="add readme", user="Alice <[email protected]>")
+
+ files = isolated_app.git_files(path)
+ readme_name, readme = isolated_app.readme_for_repo(path, files)
+ commits = isolated_app.commit_log(path)
+ ref = isolated_app.default_code_ref(path)
+
+ assert files == ["README.md"]
+ assert (readme_name, readme) == ("README.md", "# Demo\n")
+ assert commits[0]["summary"] == "add readme"
+ assert isolated_app.commit_count(path) == 1
+ assert ref["type"] == isolated_app.REF_TYPE_BRANCH
+ assert ref["name"] == "main"
+ assert ref["node"] == node
+ assert isolated_app.repo_has_revision(path, node)
+ assert isolated_app.is_ancestor(path, isolated_app.NULL_REV, node)
+
+
+def test_first_pushed_master_branch_becomes_default_and_stale_head_falls_back(isolated_app):
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ path = isolated_app.repo_path("alice", "demo")
+ node = commit_file(path, "README.md", "# Demo\n", message="initial", user="alice", branch="master")
+
+ assert isolated_app.repo_head_branch(path) == "master"
+ assert isolated_app.default_code_ref(path)["name"] == "master"
+ assert isolated_app.default_code_ref(path)["node"] == node
+ assert isolated_app.commit_count(path) == 1
+ assert isolated_app.commit_log(path)[0]["node"] == node
+
+ isolated_app.run_git(["symbolic-ref", "HEAD", "refs/heads/main"], cwd=path)
+ assert isolated_app.repo_tip_node(path) is None
+
+ client = WsgiClient(isolated_app.app)
+ overview = client.get("/alice/demo")
+ commits = client.get("/alice/demo/commits")
+
+ assert overview.status_code == 200
+ assert "<h1>Demo</h1>" in overview.text
+ assert "This repository is empty." not in overview.text
+ assert "branch master" in overview.text
+ assert commits.status_code == 200
+ assert "initial" in commits.text
+ assert "Commits (1)" in commits.text
+ assert isolated_app.repo_head_branch(path) == "master"
+
+
+def test_deleting_current_branch_moves_head_to_surviving_branch(isolated_app):
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "demo", "")
+ path = isolated_app.repo_path("alice", "demo")
+ main_node = commit_file(path, "README.md", "# Demo\n", message="initial", user="alice")
+ commit_file(path, "LEGACY.md", "legacy\n", message="legacy branch", user="alice", branch="master")
+ isolated_app.run_git(["symbolic-ref", "HEAD", "refs/heads/master"], cwd=path)
+
+ with tempfile.TemporaryDirectory(prefix="gitman-delete-branch-") as tempdir:
+ work_path = Path(tempdir) / "work"
+ isolated_app.run_git(["clone", str(path), str(work_path)], timeout=60)
+ isolated_app.run_git(["push", "origin", "--delete", "master"], cwd=work_path, timeout=60)
+
+ assert isolated_app.run_git(["show-ref", "--verify", "refs/heads/master"], cwd=path, check=False).returncode != 0
+ assert isolated_app.repo_head_branch(path) == "main"
+ assert isolated_app.default_code_ref(path)["node"] == main_node
+
+
+def test_create_pull_request_between_fork_repositories(isolated_app):
+ owner = create_user("alice")
+ author = create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ target_path = isolated_app.repo_path("alice", "demo")
+ base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
+ target_repo = isolated_app.get_repo("alice", "demo")
+
+ isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
+ source_repo = isolated_app.get_repo("bob", "demo-fork")
+ source_path = isolated_app.repo_path("bob", "demo-fork")
+ source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")
+
+ number = isolated_app.create_pull_request(
+ target_repo,
+ source_repo,
+ author,
+ "Add feature",
+ "Please merge this",
+ isolated_app.REF_TYPE_TIP,
+ "",
+ isolated_app.REF_TYPE_BRANCH,
+ "main",
+ )
+ pr = isolated_app.get_pull_request(target_repo["id"], number)
+ diff, current_source_node, source_ref = isolated_app.pull_request_diff(pr)
+
+ assert number == 1
+ assert pr["base_node"] == base_node
+ assert pr["source_node"] == source_node
+ assert pr["source_owner_username"] == "bob"
+ assert pr["target_owner_username"] == "alice"
+ assert current_source_node == source_node
+ assert source_ref["type"] == isolated_app.REF_TYPE_TIP
+ assert "feature.txt" in diff
+ assert "new feature" in diff
+
+
+def test_pull_request_source_refs_can_use_fork_branches(isolated_app):
+ owner = create_user("alice")
+ author = create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ target_path = isolated_app.repo_path("alice", "demo")
+ commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
+ target_repo = isolated_app.get_repo("alice", "demo")
+
+ isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
+ source_repo = isolated_app.get_repo("bob", "demo-fork")
+ source_path = isolated_app.repo_path("bob", "demo-fork")
+ branch_node = commit_file(
+ source_path,
+ "feature.txt",
+ "branch feature\n",
+ message="branch feature",
+ user="bob",
+ branch="feature/pr",
+ )
+
+ source_labels = [option["label"] for option in isolated_app.source_repo_ref_options(source_repo)]
+
+ assert "bob/demo-fork branch feature/pr" in source_labels
+ assert not any(" tag " in label for label in source_labels)
+
+ branch_pr_number = isolated_app.create_pull_request(
+ target_repo,
+ source_repo,
+ author,
+ "Branch PR",
+ "",
+ isolated_app.REF_TYPE_BRANCH,
+ "feature/pr",
+ isolated_app.REF_TYPE_BRANCH,
+ "main",
+ )
+ branch_pr = isolated_app.get_pull_request(target_repo["id"], branch_pr_number)
+ branch_diff, branch_source_node, branch_source_ref = isolated_app.pull_request_diff(branch_pr)
+
+ assert branch_pr["source_ref_type"] == isolated_app.REF_TYPE_BRANCH
+ assert branch_pr["source_ref_name"] == "feature/pr"
+ assert branch_pr["source_node"] == branch_node
+ assert branch_source_node == branch_node
+ assert branch_source_ref["type"] == isolated_app.REF_TYPE_BRANCH
+ assert "branch feature" in branch_diff
+
+
+def test_pull_requests_can_use_branches_from_the_target_repository(isolated_app):
+ owner = create_user("alice")
+ create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ path = isolated_app.repo_path("alice", "demo")
+ base_node = commit_file(path, "README.md", "# Demo\n", message="initial", user="alice")
+ source_node = commit_file(
+ path,
+ "feature.txt",
+ "in repo branch\n",
+ message="add in-repo feature",
+ user="alice",
+ branch="feature/in-repo",
+ )
+ target_repo = isolated_app.get_repo("alice", "demo")
+
+ bob_client = WsgiClient(isolated_app.app)
+ login_client(bob_client, "bob")
+ response = bob_client.get("/alice/demo/pulls/new")
+ assert response.status_code == 200
+ assert "alice/demo branch feature/in-repo" in response.text
+ assert "This repository has no source branches yet." not in response.text
+
+ response = bob_client.post(
+ "/alice/demo/pulls/new",
+ {
+ "source_ref": isolated_app.source_ref_option_value(
+ target_repo["id"], isolated_app.REF_TYPE_BRANCH, "feature/in-repo"
+ ),
+ "target_ref": isolated_app.ref_option_value(isolated_app.REF_TYPE_BRANCH, "main"),
+ "title": "Add in-repo feature",
+ "body": "Please merge this branch",
+ },
+ )
+ assert response.status_code == 303
+ assert response.location_path == "/alice/demo/pulls/1"
+
+ pr = isolated_app.get_pull_request(target_repo["id"], 1)
+ assert pr["source_repo_id"] == target_repo["id"]
+ assert pr["base_node"] == base_node
+ assert pr["source_node"] == source_node
+ assert pr["source_ref_type"] == isolated_app.REF_TYPE_BRANCH
+ assert pr["source_ref_name"] == "feature/in-repo"
+
+ diff, current_source_node, source_ref = isolated_app.pull_request_diff(pr)
+ assert current_source_node == source_node
+ assert source_ref["name"] == "feature/in-repo"
+ assert "feature.txt" in diff
+
+ owner_client = WsgiClient(isolated_app.app)
+ login_client(owner_client, "alice")
+ response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
+ assert response.status_code == 303
+ assert isolated_app.run_git(["rev-parse", "refs/heads/main"], cwd=path).stdout.strip() == source_node
+
+
+def test_branch_tag_helpers_resolve_and_filter_refs(isolated_app):
+ owner = create_user("alice")
+ nodes = create_repo_with_refs(owner)
+ path = nodes["path"]
+
+ branches = {branch["name"]: branch for branch in isolated_app.list_repo_branches(path)}
+ tags = isolated_app.list_repo_tags(path)
+ target_labels = [option["label"] for option in isolated_app.target_repo_ref_options(path)]
+ all_labels = [option["label"] for option in isolated_app.repo_ref_options(path)]
+ source_labels = [
+ option["label"] for option in isolated_app.source_repo_ref_options(isolated_app.get_repo("alice", "demo"))
+ ]
+
+ assert {"main", "feature", "old"}.issubset(branches)
+ assert branches["main"]["node"] == nodes["default_node"]
+ assert branches["feature"]["closed"] is False
+ assert branches["old"]["closed"] is False
+ assert tags[0]["name"] == "v1.0"
+ assert tags[0]["type"] == isolated_app.REF_TYPE_TAG
+ assert tags[0]["node"] == nodes["feature_node"]
+ assert isolated_app.default_code_ref(path)["node"] == nodes["default_node"]
+ assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BRANCH, "feature")["name"] == "feature"
+ assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_TAG, "v1.0")["node"] == nodes["feature_node"]
+ assert isolated_app.commit_ref(path, nodes["feature_node"])["type"] == isolated_app.REF_TYPE_COMMIT
+ assert "tag v1.0" in all_labels
+ assert "tag v1.0" not in target_labels
+ assert "HEAD" not in target_labels
+ assert "branch old" in target_labels
+ assert "branch old" in all_labels
+ assert "alice/demo tag v1.0" not in source_labels
+
+
+def test_repo_ref_options_mark_ten_newest_named_refs_for_picker(isolated_app):
+ owner = create_user("alice")
+ isolated_app.create_repository(owner, "many", "")
+ path = isolated_app.repo_path("alice", "many")
+ node = commit_file(path, "README.md", "# Many\n", message="initial", user=owner["username"])
+ for index in range(12):
+ isolated_app.run_git(["tag", f"v{index:02d}", node], cwd=path)
+
+ options = isolated_app.repo_ref_options(path)
+ named_options = [option for option in options if option["ref"]["type"] != isolated_app.REF_TYPE_TIP]
+ initial_options = [option for option in named_options if option["is_initial"]]
+
+ assert len(named_options) == 13
+ assert len(initial_options) == 10
+ assert not any(
+ option["is_initial"] for option in options if option["ref"]["type"] == isolated_app.REF_TYPE_TIP
+ )
+
+
+def test_bottle_signup_login_logout_and_new_repo_flow(isolated_app):
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/new")
+ assert response.status_code == 303
+ assert response.location_path == "/login?next=/new"
+
+ response = client.get("/signup?next=/new")
+ assert response.status_code == 200
+ response = client.post(
+ "/signup",
+ {"username": "alice", "password": "password123", "next": "/new"},
+ )
+ assert response.status_code == 303
+ assert response.location_path == "/new"
+ assert "session" in client.cookies
+
+ response = client.get("/new")
+ assert response.status_code == 200
+ assert "Create repository" in response.text
+
+ response = client.post("/new", {"name": "demo", "description": "A test repository"})
+ assert response.status_code == 303
+ assert response.location_path == "/alice/demo"
+ assert isolated_app.get_repo("alice", "demo") is not None
+
+ response = client.get("/alice/demo")
+ assert response.status_code == 200
+ assert "This repository is empty." in response.text
+ assert "http://example.test/git/alice/demo" in response.text
+
+ response = client.post("/logout")
+ assert response.status_code == 303
+ assert response.location_path == "/"
+ assert "session" not in client.cookies
+
+ response = client.get("/new")
+ assert response.status_code == 303
+ assert response.location_path == "/login?next=/new"
+
+
+def test_bottle_repository_pages_render_refs_files_raw_and_errors(isolated_app):
+ owner = create_user("alice")
+ nodes = create_repo_with_refs(owner)
+ client = WsgiClient(isolated_app.app)
+ commit_short = nodes["default_node"][:12]
+
+ checks = [
+ ("/", 200, "Recent Activity"),
+ ("/alice", 200, "alice/demo"),
+ ("/alice?tab=stars", 200, "No starred repositories yet."),
+ ("/alice/demo", 200, "<h1>Demo</h1>"),
+ ("/alice/demo/src", 200, "README.md"),
+ ("/alice/demo/src/README.md", 200, "# Demo"),
+ (f"/alice/demo/commits/{commit_short}", 200, "initial"),
+ ("/alice/demo/commits", 200, "initial"),
+ ("/alice/demo/branches", 200, "feature"),
+ ("/alice/demo/branches", 200, "old"),
+ ("/alice/demo/tags", 200, "v1.0"),
+ ("/alice/demo/src?ref_type=branch&ref=feature", 200, "feature.txt"),
+ ("/alice/demo/src/feature.txt?ref_type=branch&ref=feature", 200, "feature work"),
+ ("/alice/demo/src?ref_type=tag&ref=v1.0", 200, "feature.txt"),
+ ("/alice/demo/commits?ref_type=tag&ref=v1.0", 200, "add feature"),
+ ("/alice/demo/issues", 200, "No open issues."),
+ ("/alice/demo/pulls", 200, "No open pull requests."),
+ ("/static/icon.png", 200, ""),
+ ("/favicon.ico", 204, ""),
+ ("/alice/missing", 404, "Repository not found."),
+ ("/alice/demo/src/missing.txt", 404, "Path not found."),
+ ("/alice/demo/src/docs/../secret", 400, "Invalid repository path."),
+ ("/alice/demo/src?ref_type=branch&ref=missing", 404, "Branch not found."),
+ ("/alice/demo/src?ref_type=tag&ref=missing", 404, "Tag not found."),
+ ]
+
+ for path, status_code, expected_text in checks:
+ response = client.get(path)
+ assert response.status_code == status_code, path
+ assert expected_text in response.text, path
+
+ repo_response = client.get("/alice/demo")
+ issues_response = client.get("/alice/demo/issues")
+ pulls_response = client.get("/alice/demo/pulls")
+ assert 'data-ref-label="tag v1.0"' in repo_response.text
+ assert 'data-ref-initial="true"' in repo_response.text
+ assert 'class="ref-picker"' in repo_response.text
+ assert 'class="ref-picker"' not in issues_response.text
+ assert 'class="ref-picker"' not in pulls_response.text
+
+ response = client.get("/alice/demo/raw/feature.txt?ref_type=branch&ref=feature")
+ assert response.status_code == 200
+ assert response.body == b"feature work\n"
+ assert response.header("Content-Type").startswith("text/plain")
+
+
+def test_bottle_profile_star_fork_and_repo_settings_flows(isolated_app):
+ owner = create_user("alice")
+ bob = create_user("bob")
+ create_repo_with_refs(owner)
+
+ bob_client = WsgiClient(isolated_app.app)
+ response = login_client(bob_client, "bob")
+ assert response.status_code == 303
+
+ response = bob_client.post("/alice/demo/star", {"action": "star"})
+ assert response.status_code == 303
+ assert isolated_app.repo_star_count(isolated_app.get_repo("alice", "demo")["id"]) == 1
+
+ response = bob_client.post("/alice/demo/fork", {"name": "demo", "description": "Forked"})
+ assert response.status_code == 303
+ assert response.location_path == "/bob/demo"
+ assert isolated_app.get_repo("bob", "demo") is not None
+ assert "Fork of <a href=\"/alice/demo\">alice/demo</a>" in bob_client.get("/bob/demo").text
+
+ owner_client = WsgiClient(isolated_app.app)
+ response = login_client(owner_client, "alice")
+ assert response.status_code == 303
+
+ response = owner_client.post(
+ "/settings/profile",
+ {"display_name": "Alice A.", "bio": "Git maintainer", "website": "example.com"},
+ )
+ assert response.status_code == 200
+ assert "Profile updated." in response.text
+ profile = owner_client.get("/alice")
+ assert "Alice A." in profile.text
+ assert "https://example.com" in profile.text
+ assert 'style="color:black;"' not in profile.text
+ settings_response = owner_client.get("/alice/demo/settings")
+ assert settings_response.status_code == 200
+ assert 'class="ref-picker"' not in settings_response.text
+
+ response = owner_client.post("/alice/demo/settings", {"action": "save", "description": "Updated"})
+ assert response.status_code == 200
+ assert "Repository settings updated." in response.text
+ assert isolated_app.get_repo("alice", "demo")["description"] == "Updated"
+
+ response = owner_client.post("/alice/demo/settings", {"action": "add_contributor", "username": "bob"})
+ assert response.status_code == 303
+ repo = isolated_app.get_repo("alice", "demo")
+ assert isolated_app.user_can_maintain_repo(bob, repo)
+
+ response = owner_client.post("/alice/demo/settings", {"action": "remove_contributor", "user_id": str(bob["id"])})
+ assert response.status_code == 303
+ assert not isolated_app.user_can_maintain_repo(bob, repo)
+
+ response = owner_client.post("/alice/demo/settings", {"action": "delete", "confirm_name": "wrong"})
+ assert response.status_code == 200
+ assert "Type "demo" to confirm deletion." in response.text
+
+
+def test_bottle_issue_routes_create_comment_close_and_reopen(isolated_app):
+ owner = create_user("alice")
+ create_repo_with_refs(owner)
+ client = WsgiClient(isolated_app.app)
+ login_client(client, "alice")
+
+ response = client.get("/alice/demo/issues/new")
+ assert response.status_code == 200
+ assert "Open issue" in response.text
+
+ response = client.post("/alice/demo/issues/new", {"title": "", "body": ""})
+ assert response.status_code == 200
+ assert "Issue title is required." in response.text
+
+ response = client.post("/alice/demo/issues/new", {"title": "Bug report", "body": "It fails"})
+ assert response.status_code == 303
+ assert response.location_path == "/alice/demo/issues/1"
+
+ response = client.get("/alice/demo/issues/1")
+ assert response.status_code == 200
+ assert "Bug report" in response.text
+ assert "It fails" in response.text
+ assert 'class="ref-picker"' not in response.text
+
+ response = client.post("/alice/demo/issues/1", {"action": "comment", "body": ""})
+ assert response.status_code == 200
+ assert "Comment body is required." in response.text
+
+ response = client.post("/alice/demo/issues/1", {"action": "comment", "body": "I can reproduce this"})
+ assert response.status_code == 303
+ assert "I can reproduce this" in client.get("/alice/demo/issues/1").text
+
+ response = client.post("/alice/demo/issues/1", {"action": "close"})
+ assert response.status_code == 303
+ assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "closed"
+
+ response = client.post("/alice/demo/issues/1", {"action": "reopen"})
+ assert response.status_code == 303
+ assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "open"
+
+
+def test_bottle_pull_request_routes_create_comment_forbid_and_merge(isolated_app):
+ owner = create_user("alice")
+ author = create_user("bob")
+ isolated_app.create_repository(owner, "demo", "")
+ target_path = isolated_app.repo_path("alice", "demo")
+ base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
+ target_repo = isolated_app.get_repo("alice", "demo")
+ isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
+ source_repo = isolated_app.get_repo("bob", "demo-fork")
+ source_path = isolated_app.repo_path("bob", "demo-fork")
+ source_node = commit_file(
+ source_path,
+ "feature.txt",
+ "new feature\n",
+ message="add feature",
+ user="bob",
+ branch="feature/pr",
+ )
+
+ bob_client = WsgiClient(isolated_app.app)
+ login_client(bob_client, "bob")
+
+ response = bob_client.get("/alice/demo/pulls/new")
+ assert response.status_code == 200
+ assert "bob/demo-fork HEAD" in response.text
+ assert "bob/demo-fork branch feature/pr" in response.text
+ assert 'class="ref-picker"' not in response.text
+
+ response = bob_client.post(
+ "/alice/demo/pulls/new",
+ {
+ "source_ref": isolated_app.source_ref_option_value(
+ source_repo["id"], isolated_app.REF_TYPE_BRANCH, "feature/pr"
+ ),
+ "target_ref": isolated_app.ref_option_value(isolated_app.REF_TYPE_BRANCH, "main"),
+ "title": "Add feature",
+ "body": "Please merge this",
+ },
+ )
+ assert response.status_code == 303
+ assert response.location_path == "/alice/demo/pulls/1"
+
+ pr = isolated_app.get_pull_request(target_repo["id"], 1)
+ assert pr["base_node"] == base_node
+ assert pr["source_node"] == source_node
+ assert pr["source_ref_type"] == isolated_app.REF_TYPE_BRANCH
+ assert pr["source_ref_name"] == "feature/pr"
+
+ response = bob_client.get("/alice/demo/pulls/1")
+ assert response.status_code == 200
+ assert "feature.txt" in response.text
+ assert "new feature" in response.text
+ assert 'class="ref-picker"' not in response.text
+
+ response = bob_client.post("/alice/demo/pulls/1", {"action": "comment", "body": "Looks ready"})
+ assert response.status_code == 303
+ assert "Looks ready" in bob_client.get("/alice/demo/pulls/1").text
+
+ response = bob_client.post("/alice/demo/pulls/1", {"action": "close"})
+ assert response.status_code == 403
+ assert "Only maintainers can update pull requests." in response.text
+
+ owner_client = WsgiClient(isolated_app.app)
+ login_client(owner_client, "alice")
+ response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
+ assert response.status_code == 303
+
+ merged = isolated_app.get_pull_request(target_repo["id"], 1)
+ assert merged["status"] == "merged"
+ assert merged["merge_node"]
+ assert isolated_app.repo_has_revision(target_path, merged["merge_node"])
+ assert isolated_app.repo_has_revision(target_path, source_node)
+ response = owner_client.get("/alice/demo/pulls/1")
+ assert response.status_code == 200
+ assert "Merged by alice" in response.text
+
+
+def test_git_http_routes_are_public_for_reads_and_protect_writes(isolated_app):
+ owner = create_user("alice", password="owner-password")
+ create_user("bob", password="bob-password")
+ isolated_app.create_repository(owner, "demo", "")
+ path = isolated_app.repo_path("alice", "demo")
+ commit_file(path, "README.md", "# Demo\n", message="initial", user="alice")
+ client = WsgiClient(isolated_app.app)
+
+ response = client.get("/git/alice/demo/info/refs?service=git-upload-pack")
+ assert response.status_code == 200
+ assert b"git-upload-pack" in response.body
+
+ response = client.get("/git/alice/demo/info/refs?service=git-receive-pack")
+ assert response.status_code == 401
+ assert response.header("WWW-Authenticate") == 'Basic realm="GitMan"'
+ assert response.header("Connection") is None
+ assert "Authentication required." in response.text
+
+ response = client.get(
+ "/git/alice/demo/info/refs?service=git-receive-pack",
+ headers=basic_auth("alice", "wrong"),
+ )
+ assert response.status_code == 401
+ assert response.header("Connection") is None
+ assert "Invalid Git credentials." in response.text
+
+ response = client.get(
+ "/git/alice/demo/info/refs?service=git-receive-pack",
+ headers=basic_auth("bob", "bob-password"),
+ )
+ assert response.status_code == 403
+ assert response.header("Connection") is None
+ assert "Push not authorized" in response.text
+
+ hook = path / "hooks" / "post-receive"
+ hook.unlink()
+ isolated_app.run_git(["config", "--unset", "receive.denyDeleteCurrent"], cwd=path, check=False)
+ response = client.get(
+ "/git/alice/demo/info/refs?service=git-receive-pack",
+ headers=basic_auth("alice", "owner-password"),
+ )
+ assert response.status_code == 200
+ assert b"git-receive-pack" in response.body
+ assert hook.exists()
+ assert isolated_app.run_git(["config", "receive.denyDeleteCurrent"], cwd=path).stdout.strip() == "warn"