import atexit
import base64
import json
from http.cookies import SimpleCookie
from io import BytesIO, StringIO
import os
import re
import shutil
import subprocess
import sys
import tempfile
from pathlib import Path
from urllib.parse import urlencode, urlsplit, unquote
import pytest
from bottle import HTTPError
REPO_ROOT_FOR_IMPORTS = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(REPO_ROOT_FOR_IMPORTS))
BOOTSTRAP_DIR = Path(tempfile.mkdtemp(prefix="gitman-test-bootstrap-"))
atexit.register(shutil.rmtree, BOOTSTRAP_DIR, ignore_errors=True)
os.environ["GITMAN_DB"] = str(BOOTSTRAP_DIR / "gitman.sqlite3")
os.environ["GITMAN_REPO_ROOT"] = str(BOOTSTRAP_DIR / "repos")
os.environ["SECRET_KEY"] = "test-secret"
import app as gitman # noqa: E402
class WsgiResponse:
def __init__(self, status_line, headers, body):
self.status_line = status_line
self.status_code = int(status_line.split(" ", 1)[0])
self.headers = headers
self.body = body
self.text = body.decode("utf-8", "replace")
def header(self, name, default=None):
name = name.lower()
for key, value in self.headers:
if key.lower() == name:
return value
return default
@property
def location(self):
return self.header("Location")
@property
def location_path(self):
location = self.location
if not location:
return None
split = urlsplit(location)
if not split.scheme and not split.netloc:
return location
return split.path + (f"?{split.query}" if split.query else "")
class WsgiClient:
def __init__(self, wsgi_app):
self.wsgi_app = wsgi_app
self.cookies = {}
self.csrf_token = None
def get(self, path, headers=None):
return self.request("GET", path, headers=headers)
def post(self, path, data=None, headers=None):
return self.request("POST", path, data=data, headers=headers)
def request(self, method, path, data=None, headers=None):
headers = headers or {}
split = urlsplit(path)
body = b""
if method == "POST" and data is None and not split.path.startswith("/git/") and self.csrf_token:
data = {}
if data is not None:
if (
method == "POST"
and not split.path.startswith("/git/")
and gitman.CSRF_FORM_FIELD not in data
and self.csrf_token
):
data = {**data, gitman.CSRF_FORM_FIELD: self.csrf_token}
body = urlencode(data, doseq=True).encode("utf-8")
headers = {"Content-Type": "application/x-www-form-urlencoded", **headers}
environ = {
"REQUEST_METHOD": method,
"SCRIPT_NAME": "",
"PATH_INFO": unquote(split.path),
"QUERY_STRING": split.query,
"SERVER_NAME": "example.test",
"SERVER_PORT": "80",
"SERVER_PROTOCOL": "HTTP/1.1",
"HTTP_HOST": "example.test",
"wsgi.version": (1, 0),
"wsgi.url_scheme": "http",
"wsgi.input": BytesIO(body),
"wsgi.errors": StringIO(),
"wsgi.multithread": False,
"wsgi.multiprocess": False,
"wsgi.run_once": False,
"CONTENT_LENGTH": str(len(body)),
"REMOTE_ADDR": "127.0.0.1",
}
if self.cookies:
environ["HTTP_COOKIE"] = "; ".join(f"{key}={value}" for key, value in self.cookies.items())
for key, value in headers.items():
env_key = key.upper().replace("-", "_")
if env_key == "CONTENT_TYPE":
environ["CONTENT_TYPE"] = value
elif env_key == "CONTENT_LENGTH":
environ["CONTENT_LENGTH"] = value
else:
environ[f"HTTP_{env_key}"] = value
captured = {}
def start_response(status, response_headers, exc_info=None):
captured["status"] = status
captured["headers"] = response_headers
body_iter = self.wsgi_app(environ, start_response)
try:
response_body = b"".join(
chunk if isinstance(chunk, bytes) else chunk.encode("utf-8") for chunk in body_iter
)
finally:
close = getattr(body_iter, "close", None)
if close:
close()
response = WsgiResponse(captured["status"], captured["headers"], response_body)
self._store_cookies(response.headers)
self._store_csrf_token(response.text)
return response
def _store_cookies(self, headers):
for key, value in headers:
if key.lower() != "set-cookie":
continue
cookie = SimpleCookie()
cookie.load(value)
for name, morsel in cookie.items():
if morsel.value == "" and (morsel["expires"] or morsel["max-age"] == "0"):
self.cookies.pop(name, None)
else:
self.cookies[name] = morsel.value
def _store_csrf_token(self, text):
match = re.search(r'name="_csrf_token"\s+value="([^"]+)"', text)
if match:
self.csrf_token = match.group(1)
def login_client(client, username, password="correct horse battery staple", next_url="/"):
client.get("/login")
return client.post("/login", {"username": username, "password": password, "next": next_url})
@pytest.fixture()
def isolated_app(tmp_path, monkeypatch):
monkeypatch.setattr(gitman, "DB_PATH", tmp_path / "gitman.sqlite3")
monkeypatch.setattr(gitman, "REPO_ROOT", tmp_path / "repos")
gitman.AUTH_FAILURES.clear()
gitman.init_db()
return gitman
def create_user(username, password="correct horse battery staple"):
with gitman.db_connect() as conn:
conn.execute(
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
(username, gitman.hash_password(password), gitman.utcnow()),
)
return gitman.get_user_by_username(username)
def commit_file(repo_path, relative_path, content, message="initial commit", user="alice", branch=None):
with tempfile.TemporaryDirectory(prefix="gitman-test-work-") as tempdir:
work_path = Path(tempdir) / "work"
gitman.run_git(["clone", str(repo_path), str(work_path)], timeout=60)
gitman.run_git(["config", "user.name", user], cwd=work_path)
gitman.run_git(["config", "user.email", "[email protected]"], cwd=work_path)
if branch:
checkout = gitman.run_git(["checkout", branch], cwd=work_path, check=False)
if checkout.returncode != 0:
gitman.run_git(["checkout", "-b", branch], cwd=work_path)
target = work_path / relative_path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(content, encoding="utf-8")
gitman.run_git(["add", relative_path], cwd=work_path)
gitman.run_git(["commit", "-m", message], cwd=work_path)
node = gitman.run_git(["rev-parse", "HEAD"], cwd=work_path).stdout.strip()
current_branch = gitman.run_git(["branch", "--show-current"], cwd=work_path).stdout.strip() or "main"
gitman.run_git(["push", "origin", f"HEAD:refs/heads/{current_branch}"], cwd=work_path, timeout=60)
return node
def basic_auth(username, password):
token = base64.b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
return {"Authorization": f"Basic {token}"}
def create_repo_with_refs(owner):
gitman.create_repository(owner, "demo", "Demo repository")
path = gitman.repo_path(owner["username"], "demo")
default_node = commit_file(path, "README.md", "# Demo\n", message="initial", user=owner["username"])
feature_node = commit_file(
path,
"feature.txt",
"feature work\n",
message="add feature",
user=owner["username"],
branch="feature",
)
gitman.run_git(["tag", "v1.0", feature_node], cwd=path)
old_node = commit_file(
path,
"old.txt",
"old branch\n",
message="old branch work",
user=owner["username"],
branch="old",
)
return {
"path": path,
"default_node": default_node,
"feature_node": feature_node,
"old_node": old_node,
}
def test_normalize_slug_accepts_trimmed_lowercase_values():
assert gitman.normalize_slug(" Demo_Repo-1 ", "Repository name") == "demo_repo-1"
assert gitman.normalize_slug("My.Name", "Repository name") == "my.name"
@pytest.mark.parametrize(
("value", "label"),
[
("x", "Repository name"),
("bad/name", "Repository name"),
("-bad", "Repository name"),
("demo.git", "Repository name"),
("login", "Username"),
("harrisonerd", "Username"),
],
)
def test_normalize_slug_rejects_invalid_or_reserved_values(value, label):
with pytest.raises(ValueError):
gitman.normalize_slug(value, label)
def test_clean_repo_path_normalizes_and_rejects_traversal():
assert gitman.clean_repo_path("/docs/readme.md/") == "docs/readme.md"
assert gitman.clean_repo_path("") == ""
for value in ("../secret", "docs/../secret", "./file"):
with pytest.raises(HTTPError) as exc_info:
gitman.clean_repo_path(value)
assert exc_info.value.status_code == 400
def test_password_hashes_verify_and_reject_bad_inputs():
stored = gitman.hash_password("correct-password")
assert stored.startswith("pbkdf2_sha256$")
assert gitman.verify_password("correct-password", stored)
assert not gitman.verify_password("wrong-password", stored)
assert not gitman.verify_password("correct-password", "not-a-valid-hash")
@pytest.mark.parametrize(
("value", "expected"),
[
("example.com", "https://example.com"),
(" http://example.com/path ", "http://example.com/path"),
("https://example.com", "https://example.com"),
],
)
def test_normalize_website_accepts_http_urls_and_adds_scheme(value, expected):
assert gitman.normalize_website(value) == expected
@pytest.mark.parametrize("value", ["ftp://example.com", "https://", "not a url"])
def test_normalize_website_rejects_invalid_urls(value):
with pytest.raises(ValueError):
gitman.normalize_website(value)
def test_render_markdown_strips_scripts_and_unsafe_links():
rendered = gitman.render_markdown(
"""
# Title
<script>alert("x")</script>
[safe](https://example.com) [unsafe](javascript:alert(1))
| A |
| - |
| B |
"""
)
assert "<h1>Title</h1>" in rendered
assert "script" not in rendered.lower()
assert "javascript:" not in rendered.lower()
assert 'href="https://example.com"' in rendered
assert "<table>" in rendered
def test_render_markdown_links_allows_only_links():
rendered = gitman.render_markdown_links("[site](https://example.com) **bold** <strong>x</strong>")
assert rendered == '<a href="https://example.com">site</a> bold <strong>x</strong>'
def test_startup_config_rejects_default_secret_outside_debug(monkeypatch):
monkeypatch.setattr(gitman, "DEBUG", False)
monkeypatch.setattr(gitman, "SECRET_KEY", gitman.DEFAULT_SECRET_KEY)
with pytest.raises(RuntimeError):
gitman.validate_startup_config()
monkeypatch.setattr(gitman, "SECRET_KEY", "production-secret")
gitman.validate_startup_config()
def test_security_headers_and_secure_cookie_flags(isolated_app):
client = WsgiClient(isolated_app.app)
response = client.get("/login", headers={"X-Forwarded-Proto": "https"})
assert response.status_code == 200
assert response.header("X-Content-Type-Options") == "nosniff"
assert response.header("Referrer-Policy") == "same-origin"
assert response.header("X-Frame-Options") == "DENY"
assert "frame-ancestors 'none'" in response.header("Content-Security-Policy")
csrf_cookie = response.header("Set-Cookie")
assert "csrf_token=" in csrf_cookie
assert "HttpOnly" in csrf_cookie
assert "samesite=lax" in csrf_cookie.lower()
assert "Secure" in csrf_cookie
create_user("alice")
response = client.post(
"/login",
{"username": "alice", "password": "correct horse battery staple", "next": "/"},
headers={"X-Forwarded-Proto": "https"},
)
assert response.status_code == 303
session_cookie = response.header("Set-Cookie")
assert "session=" in session_cookie
assert "HttpOnly" in session_cookie
assert "samesite=lax" in session_cookie.lower()
assert "Secure" in session_cookie
def test_styles_support_system_dark_mode(isolated_app):
css = (REPO_ROOT_FOR_IMPORTS / "static" / "styles.css").read_text(encoding="utf-8")
client = WsgiClient(isolated_app.app)
response = client.get("/")
assert "@media (prefers-color-scheme: dark)" in css
assert "color-scheme: dark" in css
assert "var(--page-bg)" in css
assert "github-dark.min.css" in response.text
assert 'media="(prefers-color-scheme: dark)"' in response.text
def test_csrf_required_for_browser_posts_and_git_is_exempt(isolated_app):
owner = create_user("alice", password="owner-password")
isolated_app.create_repository(owner, "demo", "")
client = WsgiClient(isolated_app.app)
response = client.post("/login", {"username": "alice", "password": "owner-password", "next": "/"})
assert response.status_code == 403
assert "Invalid CSRF token." in response.text
client.get("/login")
response = client.post(
"/login",
{"username": "alice", "password": "owner-password", "next": "/", gitman.CSRF_FORM_FIELD: "bad-token"},
)
assert response.status_code == 403
response = client.post("/git/alice/demo/git-receive-pack")
assert response.status_code == 401
assert response.header("WWW-Authenticate") == 'Basic realm="GitMan"'
assert response.header("Connection") is None
def test_browser_form_size_limit(isolated_app, monkeypatch):
monkeypatch.setattr(gitman, "MAX_FORM_BYTES", 40)
client = WsgiClient(isolated_app.app)
client.get("/login")
response = client.post("/login", {"username": "a" * 100, "password": "bad", "next": "/"})
assert response.status_code == 413
assert "Request body too large." in response.text
def test_login_and_git_auth_failures_are_rate_limited(isolated_app, monkeypatch):
monkeypatch.setattr(gitman, "RATE_LIMIT_MAX_FAILURES", 2)
monkeypatch.setattr(gitman, "RATE_LIMIT_COOLDOWN_SECONDS", 60)
create_user("alice", password="owner-password")
client = WsgiClient(isolated_app.app)
client.get("/login")
for _ in range(2):
response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
assert response.status_code == 200
response = client.post("/login", {"username": "alice", "password": "wrong", "next": "/"})
assert response.status_code == 429
assert response.header("Retry-After") == "60"
gitman.AUTH_FAILURES.clear()
owner = gitman.get_user_by_username("alice")
isolated_app.create_repository(owner, "demo", "")
for _ in range(2):
response = client.post("/git/alice/demo/git-receive-pack", headers=basic_auth("alice", "wrong"))
assert response.status_code == 401
assert response.header("Connection") is None
response = client.post("/git/alice/demo/git-receive-pack", headers=basic_auth("alice", "wrong"))
assert response.status_code == 429
assert response.header("Connection") is None
def test_readme_and_file_previews_are_truncated(isolated_app, monkeypatch):
monkeypatch.setattr(gitman, "MAX_RENDER_BYTES", 32)
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
commit_file(isolated_app.repo_path("alice", "demo"), "README.md", "A" * 200, message="large readme")
client = WsgiClient(isolated_app.app)
response = client.get("/alice/demo")
assert response.status_code == 200
assert "README preview truncated." in response.text
response = client.get("/alice/demo/src/README.md")
assert response.status_code == 200
assert "File preview truncated." in response.text
def test_build_tree_deduplicates_entries_and_sorts_directories_first():
files = ["README.md", "src/app.py", "src/utils/helpers.py", "docs/index.md", "src/z.txt"]
assert gitman.build_tree(files, "") == [
{"name": "docs", "path": "docs", "type": "dir"},
{"name": "src", "path": "src", "type": "dir"},
{"name": "README.md", "path": "README.md", "type": "file"},
]
assert gitman.build_tree(files, "src") == [
{"name": "utils", "path": "src/utils", "type": "dir"},
{"name": "app.py", "path": "src/app.py", "type": "file"},
{"name": "z.txt", "path": "src/z.txt", "type": "file"},
]
def test_ref_option_values_round_trip_quoted_names():
branch_name = "feature/a|b c"
value = gitman.ref_option_value(gitman.REF_TYPE_BRANCH, branch_name)
assert value == "branch|feature%2Fa%7Cb%20c"
assert gitman.parse_ref_option_value(value) == (gitman.REF_TYPE_BRANCH, branch_name)
tag_name = "release/a|b c"
tag_value = gitman.ref_option_value(gitman.REF_TYPE_TAG, tag_name)
assert tag_value == "tag|release%2Fa%7Cb%20c"
assert gitman.parse_ref_option_value(tag_value) == (gitman.REF_TYPE_TAG, tag_name)
def test_source_ref_option_values_round_trip_and_validate_source_repo():
branch_name = "feature/a|b"
value = gitman.source_ref_option_value(42, gitman.REF_TYPE_BRANCH, branch_name)
assert gitman.parse_source_ref_option_value(value) == (42, gitman.REF_TYPE_BRANCH, branch_name)
with pytest.raises(ValueError, match="Invalid source repository"):
gitman.parse_source_ref_option_value(f"not-int|{gitman.REF_TYPE_BRANCH}|main")
with pytest.raises(ValueError, match="Invalid source ref"):
gitman.parse_source_ref_option_value(f"42|{gitman.REF_TYPE_COMMIT}|abc123")
def test_ref_query_and_url_helpers_skip_default_refs_unless_forced():
ref = {"type": gitman.REF_TYPE_BRANCH, "name": "main", "is_default": True}
assert gitman.ref_query_string(ref) == ""
assert gitman.ref_query_string(ref, force=True) == "ref_type=branch&ref=main"
assert gitman.url_with_ref("/alice/demo", ref, force=True) == "/alice/demo?ref_type=branch&ref=main"
assert gitman.url_with_ref("/alice/demo?tab=files", ref, force=True) == (
"/alice/demo?tab=files&ref_type=branch&ref=main"
)
def test_format_ref_label_and_option_label():
assert gitman.format_ref_label(gitman.REF_TYPE_TIP) == "HEAD"
assert gitman.format_ref_label(gitman.REF_TYPE_TAG, "v1.0") == "tag v1.0"
assert gitman.format_ref_label(gitman.REF_TYPE_COMMIT, "abcdef1234567890") == "commit abcdef123456"
assert gitman.ref_option_label({"type": gitman.REF_TYPE_BRANCH, "name": "old", "closed": False}) == "branch old"
assert gitman.ref_option_label({"type": gitman.REF_TYPE_TAG, "name": "v1.0", "local": False}) == "tag v1.0"
def test_init_db_creates_expected_tables_and_is_idempotent(isolated_app):
isolated_app.init_db()
with isolated_app.db_connect() as conn:
tables = {
row["name"]
for row in conn.execute("SELECT name FROM sqlite_master WHERE type = 'table'")
}
user_columns = {row["name"] for row in conn.execute("PRAGMA table_info(users)")}
repo_columns = {row["name"] for row in conn.execute("PRAGMA table_info(repositories)")}
pr_columns = {row["name"] for row in conn.execute("PRAGMA table_info(pull_requests)")}
assert {"users", "repositories", "issues", "pull_requests", "repo_stars"}.issubset(tables)
assert {"display_name", "bio", "website"}.issubset(user_columns)
assert {"forked_from_repo_id", "forked_at", "forked_from_node"}.issubset(repo_columns)
assert {"target_ref_type", "target_ref_name", "source_ref_type", "source_ref_name"}.issubset(pr_columns)
def test_db_connect_configures_sqlite_for_worker_contention(isolated_app):
isolated_app.init_db()
with isolated_app.db_connect() as conn:
foreign_keys = conn.execute("PRAGMA foreign_keys").fetchone()[0]
busy_timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
synchronous = conn.execute("PRAGMA synchronous").fetchone()[0]
journal_mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert foreign_keys == 1
assert busy_timeout == gitman.SQLITE_BUSY_TIMEOUT_MS
assert synchronous == 1
assert journal_mode.lower() == "wal"
def test_sqlite_accepts_concurrent_worker_writes(isolated_app):
isolated_app.init_db()
repo_root = Path(__file__).resolve().parents[1]
env = os.environ.copy()
existing_pythonpath = env.get("PYTHONPATH")
env["PYTHONPATH"] = str(repo_root) if not existing_pythonpath else f"{repo_root}{os.pathsep}{existing_pythonpath}"
worker_script = """
import os
import sys
import time
os.environ["GITMAN_DB"] = sys.argv[1]
os.environ["GITMAN_REPO_ROOT"] = sys.argv[2]
os.environ["SECRET_KEY"] = "test-secret"
import app as gitman
with gitman.db_connect() as conn:
conn.execute("BEGIN IMMEDIATE")
conn.execute(
"INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?)",
(sys.argv[3], "hash", gitman.utcnow()),
)
time.sleep(0.2)
"""
processes = [
subprocess.Popen(
[
sys.executable,
"-c",
worker_script,
str(isolated_app.DB_PATH),
str(isolated_app.REPO_ROOT),
f"worker-{index}",
],
cwd=repo_root,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
for index in range(3)
]
results = []
try:
for process in processes:
stdout, stderr = process.communicate(timeout=15)
results.append((process.returncode, stdout, stderr))
finally:
for process in processes:
if process.poll() is None:
process.kill()
assert results
for returncode, stdout, stderr in results:
assert returncode == 0, stdout + stderr
with isolated_app.db_connect() as conn:
count = conn.execute("SELECT COUNT(*) FROM users WHERE username LIKE 'worker-%'").fetchone()[0]
assert count == 3
def test_create_repository_persists_metadata_and_writes_git_config(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "line one\nline two")
repo = isolated_app.get_repo("alice", "demo")
path = isolated_app.repo_path("alice", "demo")
assert repo["owner_username"] == "alice"
assert repo["name"] == "demo"
assert path.joinpath("objects").is_dir()
assert path.joinpath("HEAD").read_text(encoding="utf-8").strip() == "ref: refs/heads/main"
assert path.joinpath("description").read_text(encoding="utf-8") == "line one line two"
assert isolated_app.run_git(["config", "gitman.owner"], cwd=path).stdout.strip() == "alice"
assert isolated_app.run_git(["config", "gitman.name"], cwd=path).stdout.strip() == "demo"
assert isolated_app.run_git(["config", "gitman.description"], cwd=path).stdout.strip() == "line one line two"
assert isolated_app.run_git(["config", "http.receivepack"], cwd=path).stdout.strip() == "true"
assert isolated_app.run_git(["config", "receive.denyDeleteCurrent"], cwd=path).stdout.strip() == "warn"
def test_create_repository_rolls_back_database_and_files_when_git_init_fails(isolated_app, monkeypatch):
owner = create_user("alice")
def fail_git(*args, **kwargs):
raise gitman.GitCommandError("init failed")
monkeypatch.setattr(isolated_app, "run_git", fail_git)
with pytest.raises(gitman.GitCommandError, match="init failed"):
isolated_app.create_repository(owner, "broken", "")
assert isolated_app.get_repo("alice", "broken") is None
assert not isolated_app.repo_path("alice", "broken").exists()
def test_star_and_contributor_helpers_update_database_and_git_config(isolated_app):
owner = create_user("alice")
contributor = create_user("bob")
isolated_app.create_repository(owner, "demo", "")
repo = isolated_app.get_repo("alice", "demo")
isolated_app.star_repo(contributor, repo)
assert isolated_app.repo_star_count(repo["id"]) == 1
assert isolated_app.user_starred_repo(contributor, repo)
isolated_app.add_repo_contributor(repo, owner, "bob")
assert isolated_app.user_can_maintain_repo(contributor, repo)
repo_path = isolated_app.repo_path("alice", "demo")
assert isolated_app.run_git(["config", "http.receivepack"], cwd=repo_path).stdout.strip() == "true"
isolated_app.unstar_repo(contributor, repo)
isolated_app.remove_repo_contributor(repo, contributor["id"])
assert isolated_app.repo_star_count(repo["id"]) == 0
assert not isolated_app.user_starred_repo(contributor, repo)
assert not isolated_app.user_can_maintain_repo(contributor, repo)
def test_issue_queries_count_filter_and_order_comments(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
repo = isolated_app.get_repo("alice", "demo")
now = isolated_app.utcnow()
with isolated_app.db_connect() as conn:
conn.execute(
"""
INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at)
VALUES (?, ?, 1, 'open issue', 'body', 'open', ?, ?)
""",
(repo["id"], owner["id"], now, now),
)
conn.execute(
"""
INSERT INTO issues (repo_id, author_id, number, title, body, status, created_at, updated_at, closed_at)
VALUES (?, ?, 2, 'closed issue', '', 'closed', ?, ?, ?)
""",
(repo["id"], owner["id"], now, now, now),
)
issue_id = conn.execute("SELECT id FROM issues WHERE number = 1").fetchone()["id"]
conn.execute(
"""
INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
VALUES (?, ?, 'second', '2026-01-01T00:00:02Z', '2026-01-01T00:00:02Z')
""",
(issue_id, owner["id"]),
)
conn.execute(
"""
INSERT INTO issue_comments (issue_id, author_id, body, created_at, updated_at)
VALUES (?, ?, 'first', '2026-01-01T00:00:01Z', '2026-01-01T00:00:01Z')
""",
(issue_id, owner["id"]),
)
assert isolated_app.issue_counts(repo["id"]) == {"open": 1, "closed": 1}
assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "all")] == [2, 1]
assert [issue["number"] for issue in isolated_app.list_issues(repo["id"], "invalid")] == [1]
assert [comment["body"] for comment in isolated_app.list_issue_comments(issue_id)] == ["first", "second"]
def test_git_read_helpers_return_files_readme_commits_and_default_ref(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
path = isolated_app.repo_path("alice", "demo")
node = commit_file(path, "README.md", "# Demo\n", message="add readme", user="Alice <[email protected]>")
files = isolated_app.git_files(path)
readme_name, readme = isolated_app.readme_for_repo(path, files)
commits = isolated_app.commit_log(path)
ref = isolated_app.default_code_ref(path)
assert files == ["README.md"]
assert (readme_name, readme) == ("README.md", "# Demo\n")
assert commits[0]["summary"] == "add readme"
assert isolated_app.commit_count(path) == 1
assert ref["type"] == isolated_app.REF_TYPE_BRANCH
assert ref["name"] == "main"
assert ref["node"] == node
assert isolated_app.repo_has_revision(path, node)
assert isolated_app.is_ancestor(path, isolated_app.NULL_REV, node)
def test_first_pushed_master_branch_becomes_default_and_stale_head_falls_back(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
path = isolated_app.repo_path("alice", "demo")
node = commit_file(path, "README.md", "# Demo\n", message="initial", user="alice", branch="master")
assert isolated_app.repo_head_branch(path) == "master"
assert isolated_app.default_code_ref(path)["name"] == "master"
assert isolated_app.default_code_ref(path)["node"] == node
assert isolated_app.commit_count(path) == 1
assert isolated_app.commit_log(path)[0]["node"] == node
isolated_app.run_git(["symbolic-ref", "HEAD", "refs/heads/main"], cwd=path)
assert isolated_app.repo_tip_node(path) is None
client = WsgiClient(isolated_app.app)
overview = client.get("/alice/demo")
commits = client.get("/alice/demo/commits")
assert overview.status_code == 200
assert "<h1>Demo</h1>" in overview.text
assert "This repository is empty." not in overview.text
assert "branch master" in overview.text
assert commits.status_code == 200
assert "initial" in commits.text
assert "Commits (1)" in commits.text
assert isolated_app.repo_head_branch(path) == "master"
def test_deleting_current_branch_moves_head_to_surviving_branch(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "demo", "")
path = isolated_app.repo_path("alice", "demo")
main_node = commit_file(path, "README.md", "# Demo\n", message="initial", user="alice")
commit_file(path, "LEGACY.md", "legacy\n", message="legacy branch", user="alice", branch="master")
isolated_app.run_git(["symbolic-ref", "HEAD", "refs/heads/master"], cwd=path)
with tempfile.TemporaryDirectory(prefix="gitman-delete-branch-") as tempdir:
work_path = Path(tempdir) / "work"
isolated_app.run_git(["clone", str(path), str(work_path)], timeout=60)
isolated_app.run_git(["push", "origin", "--delete", "master"], cwd=work_path, timeout=60)
assert isolated_app.run_git(["show-ref", "--verify", "refs/heads/master"], cwd=path, check=False).returncode != 0
assert isolated_app.repo_head_branch(path) == "main"
assert isolated_app.default_code_ref(path)["node"] == main_node
def test_create_pull_request_between_fork_repositories(isolated_app):
owner = create_user("alice")
author = create_user("bob")
isolated_app.create_repository(owner, "demo", "")
target_path = isolated_app.repo_path("alice", "demo")
base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
target_repo = isolated_app.get_repo("alice", "demo")
isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
source_repo = isolated_app.get_repo("bob", "demo-fork")
source_path = isolated_app.repo_path("bob", "demo-fork")
source_node = commit_file(source_path, "feature.txt", "new feature\n", message="add feature", user="bob")
number = isolated_app.create_pull_request(
target_repo,
source_repo,
author,
"Add feature",
"Please merge this",
isolated_app.REF_TYPE_TIP,
"",
isolated_app.REF_TYPE_BRANCH,
"main",
)
pr = isolated_app.get_pull_request(target_repo["id"], number)
diff, current_source_node, source_ref = isolated_app.pull_request_diff(pr)
assert number == 1
assert pr["base_node"] == base_node
assert pr["source_node"] == source_node
assert pr["source_owner_username"] == "bob"
assert pr["target_owner_username"] == "alice"
assert current_source_node == source_node
assert source_ref["type"] == isolated_app.REF_TYPE_TIP
assert "feature.txt" in diff
assert "new feature" in diff
def test_pull_request_source_refs_can_use_fork_branches(isolated_app):
owner = create_user("alice")
author = create_user("bob")
isolated_app.create_repository(owner, "demo", "")
target_path = isolated_app.repo_path("alice", "demo")
commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
target_repo = isolated_app.get_repo("alice", "demo")
isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
source_repo = isolated_app.get_repo("bob", "demo-fork")
source_path = isolated_app.repo_path("bob", "demo-fork")
branch_node = commit_file(
source_path,
"feature.txt",
"branch feature\n",
message="branch feature",
user="bob",
branch="feature/pr",
)
source_labels = [option["label"] for option in isolated_app.source_repo_ref_options(source_repo)]
assert "bob/demo-fork branch feature/pr" in source_labels
assert not any(" tag " in label for label in source_labels)
branch_pr_number = isolated_app.create_pull_request(
target_repo,
source_repo,
author,
"Branch PR",
"",
isolated_app.REF_TYPE_BRANCH,
"feature/pr",
isolated_app.REF_TYPE_BRANCH,
"main",
)
branch_pr = isolated_app.get_pull_request(target_repo["id"], branch_pr_number)
branch_diff, branch_source_node, branch_source_ref = isolated_app.pull_request_diff(branch_pr)
assert branch_pr["source_ref_type"] == isolated_app.REF_TYPE_BRANCH
assert branch_pr["source_ref_name"] == "feature/pr"
assert branch_pr["source_node"] == branch_node
assert branch_source_node == branch_node
assert branch_source_ref["type"] == isolated_app.REF_TYPE_BRANCH
assert "branch feature" in branch_diff
def test_pull_requests_can_use_branches_from_the_target_repository(isolated_app):
owner = create_user("alice")
create_user("bob")
isolated_app.create_repository(owner, "demo", "")
path = isolated_app.repo_path("alice", "demo")
base_node = commit_file(path, "README.md", "# Demo\n", message="initial", user="alice")
source_node = commit_file(
path,
"feature.txt",
"in repo branch\n",
message="add in-repo feature",
user="alice",
branch="feature/in-repo",
)
target_repo = isolated_app.get_repo("alice", "demo")
bob_client = WsgiClient(isolated_app.app)
login_client(bob_client, "bob")
response = bob_client.get("/alice/demo/pulls/new")
assert response.status_code == 200
assert "alice/demo branch feature/in-repo" in response.text
assert "This repository has no source branches yet." not in response.text
response = bob_client.post(
"/alice/demo/pulls/new",
{
"source_ref": isolated_app.source_ref_option_value(
target_repo["id"], isolated_app.REF_TYPE_BRANCH, "feature/in-repo"
),
"target_ref": isolated_app.ref_option_value(isolated_app.REF_TYPE_BRANCH, "main"),
"title": "Add in-repo feature",
"body": "Please merge this branch",
},
)
assert response.status_code == 303
assert response.location_path == "/alice/demo/pulls/1"
pr = isolated_app.get_pull_request(target_repo["id"], 1)
assert pr["source_repo_id"] == target_repo["id"]
assert pr["base_node"] == base_node
assert pr["source_node"] == source_node
assert pr["source_ref_type"] == isolated_app.REF_TYPE_BRANCH
assert pr["source_ref_name"] == "feature/in-repo"
diff, current_source_node, source_ref = isolated_app.pull_request_diff(pr)
assert current_source_node == source_node
assert source_ref["name"] == "feature/in-repo"
assert "feature.txt" in diff
owner_client = WsgiClient(isolated_app.app)
login_client(owner_client, "alice")
response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
assert response.status_code == 303
assert isolated_app.run_git(["rev-parse", "refs/heads/main"], cwd=path).stdout.strip() == source_node
def test_branch_tag_helpers_resolve_and_filter_refs(isolated_app):
owner = create_user("alice")
nodes = create_repo_with_refs(owner)
path = nodes["path"]
branches = {branch["name"]: branch for branch in isolated_app.list_repo_branches(path)}
tags = isolated_app.list_repo_tags(path)
target_labels = [option["label"] for option in isolated_app.target_repo_ref_options(path)]
all_labels = [option["label"] for option in isolated_app.repo_ref_options(path)]
source_labels = [
option["label"] for option in isolated_app.source_repo_ref_options(isolated_app.get_repo("alice", "demo"))
]
assert {"main", "feature", "old"}.issubset(branches)
assert branches["main"]["node"] == nodes["default_node"]
assert branches["feature"]["closed"] is False
assert branches["old"]["closed"] is False
assert tags[0]["name"] == "v1.0"
assert tags[0]["type"] == isolated_app.REF_TYPE_TAG
assert tags[0]["node"] == nodes["feature_node"]
assert isolated_app.default_code_ref(path)["node"] == nodes["default_node"]
assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_BRANCH, "feature")["name"] == "feature"
assert isolated_app.resolve_repo_ref(path, isolated_app.REF_TYPE_TAG, "v1.0")["node"] == nodes["feature_node"]
assert isolated_app.commit_ref(path, nodes["feature_node"])["type"] == isolated_app.REF_TYPE_COMMIT
assert "tag v1.0" in all_labels
assert "tag v1.0" not in target_labels
assert "HEAD" not in target_labels
assert "branch old" in target_labels
assert "branch old" in all_labels
assert "alice/demo tag v1.0" not in source_labels
def test_repo_ref_options_mark_ten_newest_named_refs_for_picker(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "many", "")
path = isolated_app.repo_path("alice", "many")
node = commit_file(path, "README.md", "# Many\n", message="initial", user=owner["username"])
for index in range(12):
isolated_app.run_git(["tag", f"v{index:02d}", node], cwd=path)
options = isolated_app.repo_ref_options(path)
named_options = [option for option in options if option["ref"]["type"] != isolated_app.REF_TYPE_TIP]
initial_options = [option for option in named_options if option["is_initial"]]
assert len(named_options) == 13
assert len(initial_options) == 10
assert not any(
option["is_initial"] for option in options if option["ref"]["type"] == isolated_app.REF_TYPE_TIP
)
response = WsgiClient(isolated_app.app).get("/alice/many")
assert response.text.count('data-ref-label="') == 10
assert response.text.count('data-ref-initial="true"') == 10
assert 'data-ref-label="tag v00"' not in response.text
assert '[hidden] { display: none !important; }' in Path("static/styles.css").read_text(encoding="utf-8")
def test_repo_ref_picker_limits_branch_recents_but_searches_all_branches(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "branches", "")
path = isolated_app.repo_path("alice", "branches")
node = commit_file(path, "README.md", "# Branches\n", message="initial", user=owner["username"])
for index in range(12):
isolated_app.run_git(["update-ref", f"refs/heads/topic{index:02d}", node], cwd=path)
response = WsgiClient(isolated_app.app).get("/alice/branches")
assert response.text.count('data-ref-label="') == 10
assert response.text.count('data-ref-initial="true"') == 10
assert 'data-ref-label="branch topic00"' not in response.text
search_response = WsgiClient(isolated_app.app).get("/alice/branches/refs/search?q=topic00")
assert search_response.status_code == 200
assert any(
result["type"] == isolated_app.REF_TYPE_BRANCH and result["name"] == "topic00"
for result in json.loads(search_response.text)["results"]
)
def test_repo_ref_search_finds_refs_outside_initial_picker_options(isolated_app):
owner = create_user("alice")
isolated_app.create_repository(owner, "many", "")
path = isolated_app.repo_path("alice", "many")
node = commit_file(path, "README.md", "# Many\n", message="initial", user=owner["username"])
for index in range(12):
isolated_app.run_git(["tag", f"v{index:02d}", node], cwd=path)
isolated_app.run_git(["update-ref", f"refs/heads/topic{index:02d}", node], cwd=path)
initial_labels = {
option["label"]
for option in isolated_app.repo_ref_options(path)
if option["is_initial"]
}
assert "branch topic00" not in initial_labels
assert "tag v00" not in initial_labels
client = WsgiClient(isolated_app.app)
branch_response = client.get("/alice/many/refs/search?q=topic00")
tag_response = client.get("/alice/many/refs/search?q=v00")
empty_response = client.get("/alice/many/refs/search")
assert branch_response.status_code == 200
assert branch_response.header("Content-Type").startswith("application/json")
assert any(
result["type"] == isolated_app.REF_TYPE_BRANCH and result["name"] == "topic00"
for result in json.loads(branch_response.text)["results"]
)
assert any(
result["type"] == isolated_app.REF_TYPE_TAG and result["name"] == "v00"
for result in json.loads(tag_response.text)["results"]
)
assert json.loads(empty_response.text)["results"] == []
def test_repo_ref_search_finds_commits_by_subject_and_sha_across_all_refs(isolated_app):
owner = create_user("alice")
nodes = create_repo_with_refs(owner)
client = WsgiClient(isolated_app.app)
subject_response = client.get(f"/alice/demo/refs/search?{urlencode({'q': 'old branch work'})}")
sha_response = client.get(f"/alice/demo/refs/search?{urlencode({'q': nodes['old_node'][:12]})}")
assert subject_response.status_code == 200
assert any(
result["type"] == isolated_app.REF_TYPE_COMMIT
and result["name"] == nodes["old_node"]
and "old branch work" in result["label"]
for result in json.loads(subject_response.text)["results"]
)
assert any(
result["type"] == isolated_app.REF_TYPE_COMMIT and result["name"] == nodes["old_node"]
for result in json.loads(sha_response.text)["results"]
)
def test_bottle_signup_login_logout_and_new_repo_flow(isolated_app):
client = WsgiClient(isolated_app.app)
response = client.get("/new")
assert response.status_code == 303
assert response.location_path == "/login?next=/new"
response = client.get("/signup?next=/new")
assert response.status_code == 200
response = client.post(
"/signup",
{"username": "alice", "password": "password123", "next": "/new"},
)
assert response.status_code == 303
assert response.location_path == "/new"
assert "session" in client.cookies
response = client.get("/new")
assert response.status_code == 200
assert "Create repository" in response.text
response = client.post("/new", {"name": "demo", "description": "A test repository"})
assert response.status_code == 303
assert response.location_path == "/alice/demo"
assert isolated_app.get_repo("alice", "demo") is not None
response = client.get("/alice/demo")
assert response.status_code == 200
assert "This repository is empty." in response.text
assert "http://example.test/git/alice/demo" in response.text
response = client.post("/logout")
assert response.status_code == 303
assert response.location_path == "/"
assert "session" not in client.cookies
response = client.get("/new")
assert response.status_code == 303
assert response.location_path == "/login?next=/new"
def test_bottle_repository_pages_render_refs_files_raw_and_errors(isolated_app):
owner = create_user("alice")
nodes = create_repo_with_refs(owner)
client = WsgiClient(isolated_app.app)
commit_short = nodes["default_node"][:12]
checks = [
("/", 200, "Recent Activity"),
("/alice", 200, "alice/demo"),
("/alice?tab=stars", 200, "No starred repositories yet."),
("/alice/demo", 200, "<h1>Demo</h1>"),
("/alice/demo/src", 200, "README.md"),
("/alice/demo/src/README.md", 200, "# Demo"),
(f"/alice/demo/commits/{commit_short}", 200, "initial"),
("/alice/demo/commits", 200, "initial"),
("/alice/demo/branches", 200, "feature"),
("/alice/demo/branches", 200, "old"),
("/alice/demo/tags", 200, "v1.0"),
("/alice/demo/src?ref_type=branch&ref=feature", 200, "feature.txt"),
("/alice/demo/src/feature.txt?ref_type=branch&ref=feature", 200, "feature work"),
("/alice/demo/src?ref_type=tag&ref=v1.0", 200, "feature.txt"),
("/alice/demo/commits?ref_type=tag&ref=v1.0", 200, "add feature"),
("/alice/demo/issues", 200, "No open issues."),
("/alice/demo/pulls", 200, "No open pull requests."),
("/static/icon.png", 200, ""),
("/favicon.ico", 204, ""),
("/alice/missing", 404, "Repository not found."),
("/alice/demo/src/missing.txt", 404, "Path not found."),
("/alice/demo/src/docs/../secret", 400, "Invalid repository path."),
("/alice/demo/src?ref_type=branch&ref=missing", 404, "Branch not found."),
("/alice/demo/src?ref_type=tag&ref=missing", 404, "Tag not found."),
]
for path, status_code, expected_text in checks:
response = client.get(path)
assert response.status_code == status_code, path
assert expected_text in response.text, path
repo_response = client.get("/alice/demo")
issues_response = client.get("/alice/demo/issues")
pulls_response = client.get("/alice/demo/pulls")
assert 'data-ref-label="tag v1.0"' in repo_response.text
assert 'data-ref-initial="true"' in repo_response.text
assert 'class="ref-picker"' in repo_response.text
assert 'class="ref-picker"' not in issues_response.text
assert 'class="ref-picker"' not in pulls_response.text
response = client.get("/alice/demo/raw/feature.txt?ref_type=branch&ref=feature")
assert response.status_code == 200
assert response.body == b"feature work\n"
assert response.header("Content-Type").startswith("text/plain")
def test_bottle_profile_star_fork_and_repo_settings_flows(isolated_app):
owner = create_user("alice")
bob = create_user("bob")
create_repo_with_refs(owner)
bob_client = WsgiClient(isolated_app.app)
response = login_client(bob_client, "bob")
assert response.status_code == 303
response = bob_client.post("/alice/demo/star", {"action": "star"})
assert response.status_code == 303
assert isolated_app.repo_star_count(isolated_app.get_repo("alice", "demo")["id"]) == 1
response = bob_client.post("/alice/demo/fork", {"name": "demo", "description": "Forked"})
assert response.status_code == 303
assert response.location_path == "/bob/demo"
assert isolated_app.get_repo("bob", "demo") is not None
assert "Fork of <a href=\"/alice/demo\">alice/demo</a>" in bob_client.get("/bob/demo").text
owner_client = WsgiClient(isolated_app.app)
response = login_client(owner_client, "alice")
assert response.status_code == 303
response = owner_client.post(
"/settings/profile",
{"display_name": "Alice A.", "bio": "Git maintainer", "website": "example.com"},
)
assert response.status_code == 200
assert "Profile updated." in response.text
profile = owner_client.get("/alice")
assert "Alice A." in profile.text
assert "https://example.com" in profile.text
assert 'style="color:black;"' not in profile.text
settings_response = owner_client.get("/alice/demo/settings")
assert settings_response.status_code == 200
assert 'class="ref-picker"' not in settings_response.text
response = owner_client.post("/alice/demo/settings", {"action": "save", "description": "Updated"})
assert response.status_code == 200
assert "Repository settings updated." in response.text
assert isolated_app.get_repo("alice", "demo")["description"] == "Updated"
response = owner_client.post("/alice/demo/settings", {"action": "add_contributor", "username": "bob"})
assert response.status_code == 303
repo = isolated_app.get_repo("alice", "demo")
assert isolated_app.user_can_maintain_repo(bob, repo)
response = owner_client.post("/alice/demo/settings", {"action": "remove_contributor", "user_id": str(bob["id"])})
assert response.status_code == 303
assert not isolated_app.user_can_maintain_repo(bob, repo)
response = owner_client.post("/alice/demo/settings", {"action": "delete", "confirm_name": "wrong"})
assert response.status_code == 200
assert "Type "demo" to confirm deletion." in response.text
def test_bottle_issue_routes_create_comment_close_and_reopen(isolated_app):
owner = create_user("alice")
create_repo_with_refs(owner)
client = WsgiClient(isolated_app.app)
login_client(client, "alice")
response = client.get("/alice/demo/issues/new")
assert response.status_code == 200
assert "Open issue" in response.text
response = client.post("/alice/demo/issues/new", {"title": "", "body": ""})
assert response.status_code == 200
assert "Issue title is required." in response.text
response = client.post("/alice/demo/issues/new", {"title": "Bug report", "body": "It fails"})
assert response.status_code == 303
assert response.location_path == "/alice/demo/issues/1"
response = client.get("/alice/demo/issues/1")
assert response.status_code == 200
assert "Bug report" in response.text
assert "It fails" in response.text
assert 'class="ref-picker"' not in response.text
response = client.post("/alice/demo/issues/1", {"action": "comment", "body": ""})
assert response.status_code == 200
assert "Comment body is required." in response.text
response = client.post("/alice/demo/issues/1", {"action": "comment", "body": "I can reproduce this"})
assert response.status_code == 303
assert "I can reproduce this" in client.get("/alice/demo/issues/1").text
response = client.post("/alice/demo/issues/1", {"action": "close"})
assert response.status_code == 303
assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "closed"
response = client.post("/alice/demo/issues/1", {"action": "reopen"})
assert response.status_code == 303
assert isolated_app.get_issue(isolated_app.get_repo("alice", "demo")["id"], 1)["status"] == "open"
def test_bottle_pull_request_routes_create_comment_forbid_and_merge(isolated_app):
owner = create_user("alice")
author = create_user("bob")
isolated_app.create_repository(owner, "demo", "")
target_path = isolated_app.repo_path("alice", "demo")
base_node = commit_file(target_path, "README.md", "# Demo\n", message="initial", user="alice")
target_repo = isolated_app.get_repo("alice", "demo")
isolated_app.fork_repository(author, target_repo, "demo-fork", "forked copy")
source_repo = isolated_app.get_repo("bob", "demo-fork")
source_path = isolated_app.repo_path("bob", "demo-fork")
source_node = commit_file(
source_path,
"feature.txt",
"new feature\n",
message="add feature",
user="bob",
branch="feature/pr",
)
bob_client = WsgiClient(isolated_app.app)
login_client(bob_client, "bob")
response = bob_client.get("/alice/demo/pulls/new")
assert response.status_code == 200
assert "bob/demo-fork HEAD" in response.text
assert "bob/demo-fork branch feature/pr" in response.text
assert 'class="ref-picker"' not in response.text
response = bob_client.post(
"/alice/demo/pulls/new",
{
"source_ref": isolated_app.source_ref_option_value(
source_repo["id"], isolated_app.REF_TYPE_BRANCH, "feature/pr"
),
"target_ref": isolated_app.ref_option_value(isolated_app.REF_TYPE_BRANCH, "main"),
"title": "Add feature",
"body": "Please merge this",
},
)
assert response.status_code == 303
assert response.location_path == "/alice/demo/pulls/1"
pr = isolated_app.get_pull_request(target_repo["id"], 1)
assert pr["base_node"] == base_node
assert pr["source_node"] == source_node
assert pr["source_ref_type"] == isolated_app.REF_TYPE_BRANCH
assert pr["source_ref_name"] == "feature/pr"
response = bob_client.get("/alice/demo/pulls/1")
assert response.status_code == 200
assert "feature.txt" in response.text
assert "new feature" in response.text
assert 'class="ref-picker"' not in response.text
response = bob_client.post("/alice/demo/pulls/1", {"action": "comment", "body": "Looks ready"})
assert response.status_code == 303
assert "Looks ready" in bob_client.get("/alice/demo/pulls/1").text
response = bob_client.post("/alice/demo/pulls/1", {"action": "close"})
assert response.status_code == 403
assert "Only maintainers can update pull requests." in response.text
owner_client = WsgiClient(isolated_app.app)
login_client(owner_client, "alice")
response = owner_client.post("/alice/demo/pulls/1", {"action": "merge"})
assert response.status_code == 303
merged = isolated_app.get_pull_request(target_repo["id"], 1)
assert merged["status"] == "merged"
assert merged["merge_node"]
assert isolated_app.repo_has_revision(target_path, merged["merge_node"])
assert isolated_app.repo_has_revision(target_path, source_node)
response = owner_client.get("/alice/demo/pulls/1")
assert response.status_code == 200
assert "Merged by alice" in response.text
def test_git_http_routes_are_public_for_reads_and_protect_writes(isolated_app):
owner = create_user("alice", password="owner-password")
create_user("bob", password="bob-password")
isolated_app.create_repository(owner, "demo", "")
path = isolated_app.repo_path("alice", "demo")
commit_file(path, "README.md", "# Demo\n", message="initial", user="alice")
client = WsgiClient(isolated_app.app)
response = client.get("/git/alice/demo/info/refs?service=git-upload-pack")
assert response.status_code == 200
assert b"git-upload-pack" in response.body
response = client.get("/git/alice/demo/info/refs?service=git-receive-pack")
assert response.status_code == 401
assert response.header("WWW-Authenticate") == 'Basic realm="GitMan"'
assert response.header("Connection") is None
assert "Authentication required." in response.text
response = client.get(
"/git/alice/demo/info/refs?service=git-receive-pack",
headers=basic_auth("alice", "wrong"),
)
assert response.status_code == 401
assert response.header("Connection") is None
assert "Invalid Git credentials." in response.text
response = client.get(
"/git/alice/demo/info/refs?service=git-receive-pack",
headers=basic_auth("bob", "bob-password"),
)
assert response.status_code == 403
assert response.header("Connection") is None
assert "Push not authorized" in response.text
hook = path / "hooks" / "post-receive"
hook.unlink()
isolated_app.run_git(["config", "--unset", "receive.denyDeleteCurrent"], cwd=path, check=False)
response = client.get(
"/git/alice/demo/info/refs?service=git-receive-pack",
headers=basic_auth("alice", "owner-password"),
)
assert response.status_code == 200
assert b"git-receive-pack" in response.body
assert hook.exists()
assert isolated_app.run_git(["config", "receive.denyDeleteCurrent"], cwd=path).stdout.strip() == "warn"