import asyncio
import hmac
import json
import os
import secrets
from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from typing import Any, Dict, List, Optional, Tuple
import stripe
from bson import ObjectId
from bson.errors import InvalidId
from markupsafe import Markup, escape
from pymongo import AsyncMongoClient, ReturnDocument
from pymongo.errors import DuplicateKeyError
from micropie import App
PAGE_SIZE = 20
ADMIN_COOKIE = "invoice_admin"
SUCCESSFUL_CHECKOUT_EVENTS = {
"checkout.session.completed",
"checkout.session.async_payment_succeeded",
}
def load_dotenv(path: str = ".env") -> None:
if not os.path.exists(path):
return
with open(path, "r", encoding="utf-8") as env_file:
for raw_line in env_file:
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key:
os.environ.setdefault(key, value)
def now_utc() -> datetime:
return datetime.now(timezone.utc)
def parse_money_to_cents(value: str) -> Optional[int]:
if value is None:
return None
value = value.strip().replace(",", "").replace("$", "")
if not value:
return None
try:
amount = Decimal(value).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
except (InvalidOperation, ValueError):
return None
if amount < 0:
return None
return int(amount * 100)
def cents_to_money(cents: int, currency: str = "USD") -> str:
amount = Decimal(int(cents or 0)) / Decimal(100)
if currency.upper() == "USD":
return f"${amount:,.2f}"
return f"{currency.upper()} {amount:,.2f}"
def format_star_emphasis(value: Any) -> Markup:
text = "" if value is None else str(value)
rendered = Markup("")
position = 0
while True:
start = text.find("**", position)
if start == -1:
rendered += escape(text[position:])
break
end = text.find("**", start + 2)
if end == -1:
rendered += escape(text[position:])
break
rendered += escape(text[position:start])
rendered += Markup('<strong class="star-emphasis">')
rendered += escape(text[start + 2 : end])
rendered += Markup("</strong>")
position = end + 2
return rendered
def object_id(value: str) -> Optional[ObjectId]:
try:
return ObjectId(value)
except (InvalidId, TypeError):
return None
def as_plain_dict(value: Any) -> Dict[str, Any]:
if isinstance(value, dict):
return value
if hasattr(value, "to_dict_recursive"):
return value.to_dict_recursive()
if hasattr(value, "model_dump"):
return value.model_dump()
if hasattr(value, "to_dict"):
return value.to_dict()
return {}
class InvoiceApp(App):
def __init__(self) -> None:
super().__init__()
self.mongo_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017")
self.mongo_db_name = os.getenv("MONGODB_DB", "invoice_maker")
timeout_ms = int(os.getenv("MONGODB_SERVER_SELECTION_TIMEOUT_MS", "5000"))
self.client = AsyncMongoClient(
self.mongo_uri,
serverSelectionTimeoutMS=timeout_ms,
connect=False,
)
self._indexes_ready = False
self._index_lock = asyncio.Lock()
self.admin_password = os.getenv("ADMIN_PASSWORD", "admin")
self.admin_cookie_secret = (
os.getenv("ADMIN_COOKIE_SECRET")
or os.getenv("APP_SECRET")
or self.admin_password
or "invoice-maker-dev"
)
self.app_base_url = os.getenv("APP_BASE_URL", "").rstrip("/")
self.public_logo_url = os.getenv("PUBLIC_LOGO_URL", "").strip()
self.public_logo_alt = os.getenv("PUBLIC_LOGO_ALT", "ProjectPay Logo").strip()
self.currency = os.getenv("STRIPE_CURRENCY", "usd").upper()
self.stripe_secret_key = os.getenv("STRIPE_SECRET_KEY", "")
self.stripe_webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET", "")
self.stripe_product_id = os.getenv("STRIPE_PRODUCT_ID", "")
self.stripe_minimum_amount_cents = int(
os.getenv("STRIPE_MINIMUM_AMOUNT_CENTS", "100") or "100"
)
self.stripe_http_client = None
if self.stripe_secret_key:
stripe.api_key = self.stripe_secret_key
self.stripe_http_client = stripe.HTTPXClient()
stripe.default_http_client = self.stripe_http_client
self.shutdown_handlers.append(self._shutdown_clients)
if self.env is not None:
self.env.filters["money"] = cents_to_money
self.env.filters["star_emphasis"] = format_star_emphasis
self.env.globals["money"] = cents_to_money
async def _db(self):
await self._ensure_indexes()
return self.client[self.mongo_db_name]
async def _ensure_indexes(self) -> None:
if self._indexes_ready:
return
async with self._index_lock:
if self._indexes_ready:
return
db = self.client[self.mongo_db_name]
await db.projects.create_index("project_number", unique=True)
await db.projects.create_index("share_token", unique=True)
await db.projects.create_index("customer_name")
await db.projects.create_index("created_at")
await db.payments.create_index("project_id")
await db.payments.create_index("webhook_id", unique=True, sparse=True)
await db.payments.create_index("stripe_event_id", unique=True, sparse=True)
await db.payments.create_index(
"stripe_payment_intent_id", unique=True, sparse=True
)
await db.payments.create_index(
"stripe_checkout_session_id", unique=True, sparse=True
)
await db.checkout_sessions.create_index(
"session_id", unique=True, sparse=True
)
await db.checkout_sessions.create_index("project_id")
await db.webhook_events.create_index(
"webhook_id", unique=True, sparse=True
)
self._indexes_ready = True
async def _shutdown_clients(self) -> None:
if self.stripe_http_client is not None:
await self.stripe_http_client.close_async()
await self.client.close()
async def __call__(self, scope, receive, send) -> None:
if (
scope.get("type") == "http"
and scope.get("method") == "POST"
and scope.get("path") == "/webhook/stripe"
):
await self._webhook_asgi(scope, receive, send)
return
await super().__call__(scope, receive, send)
async def _read_raw_body(self, receive) -> bytes:
chunks: List[bytes] = []
while True:
message = await receive()
if message["type"] == "http.disconnect":
break
if body := message.get("body", b""):
chunks.append(body)
if not message.get("more_body"):
break
return b"".join(chunks)
async def _webhook_asgi(self, scope, receive, send) -> None:
raw_body = await self._read_raw_body(receive)
headers = {
key.decode("utf-8", "replace").lower(): value.decode("utf-8", "replace")
for key, value in scope.get("headers", [])
}
status, body = await self._handle_stripe_webhook(raw_body, headers)
await self._send_response(
send,
status,
json.dumps(body),
[("Content-Type", "application/json")],
)
async def _handle_stripe_webhook(
self, raw_body: bytes, headers: Dict[str, str]
) -> Tuple[int, Dict[str, Any]]:
if not self.stripe_webhook_secret:
return 500, {"error": "Stripe webhook verification is not configured"}
try:
event = stripe.Webhook.construct_event(
raw_body,
headers.get("stripe-signature", ""),
self.stripe_webhook_secret,
)
except ValueError:
return 400, {"error": "Invalid JSON payload"}
except stripe.error.SignatureVerificationError:
return 401, {"error": "Invalid webhook signature"}
payload = as_plain_dict(event)
webhook_id = str(payload.get("id") or "")
if not webhook_id:
return 400, {"error": "Webhook event id is missing"}
event_type = payload.get("type", "")
created_at = now_utc()
db = await self._db()
existing_event = await db.webhook_events.find_one(
{"webhook_id": webhook_id},
{"processed": 1},
)
if existing_event and existing_event.get("processed") is True:
return 200, {"received": True, "duplicate": True}
try:
await db.webhook_events.update_one(
{"webhook_id": webhook_id},
{
"$setOnInsert": {
"webhook_id": webhook_id,
"created_at": created_at,
},
"$set": {
"provider": "stripe",
"event_type": event_type,
"payload": payload,
"processed": False,
"last_received_at": created_at,
},
"$inc": {"attempts": 1},
},
upsert=True,
)
except DuplicateKeyError:
existing_event = await db.webhook_events.find_one(
{"webhook_id": webhook_id},
{"processed": 1},
)
if existing_event and existing_event.get("processed") is True:
return 200, {"received": True, "duplicate": True}
try:
outcome = await self._process_stripe_webhook(payload, webhook_id)
await db.webhook_events.update_one(
{"webhook_id": webhook_id},
{
"$set": {
"processed": True,
"processed_at": now_utc(),
"outcome": outcome,
},
"$unset": {"error": ""},
},
)
return 200, {"received": True, **outcome}
except Exception as exc:
await db.webhook_events.update_one(
{"webhook_id": webhook_id},
{
"$set": {
"processed": False,
"processed_at": now_utc(),
"error": str(exc),
}
},
)
return 500, {"error": "Webhook processing failed"}
async def _process_stripe_webhook(
self, payload: Dict[str, Any], webhook_id: str
) -> Dict[str, Any]:
event_type = payload.get("type", "")
data = payload.get("data") or {}
session = as_plain_dict(data.get("object"))
if event_type not in SUCCESSFUL_CHECKOUT_EVENTS:
return {"ignored": True, "reason": "not a successful checkout session"}
if str(session.get("payment_status") or "").lower() != "paid":
return {"ignored": True, "reason": "checkout session is not paid"}
project = await self._project_from_stripe_session(session)
if project is None:
return {"ignored": True, "reason": "project metadata not found"}
try:
amount_cents = int(session.get("amount_total") or 0)
except (TypeError, ValueError):
amount_cents = 0
if amount_cents <= 0:
return {"ignored": True, "reason": "payment amount not found"}
checkout_session_id = str(session.get("id") or "")
payment_intent_id = str(session.get("payment_intent") or "")
currency = str(session.get("currency") or self.currency).upper()
payment_doc = {
"project_id": project["_id"],
"provider": "stripe",
"webhook_id": webhook_id,
"stripe_event_id": webhook_id,
"amount_cents": amount_cents,
"currency": currency,
"status": "succeeded",
"event_type": payload.get("type", ""),
"raw_event": payload,
"created_at": now_utc(),
}
if checkout_session_id:
payment_doc["stripe_checkout_session_id"] = checkout_session_id
if payment_intent_id:
payment_doc["stripe_payment_intent_id"] = payment_intent_id
query: Dict[str, Any] = {"stripe_event_id": webhook_id}
if payment_intent_id:
query = {"stripe_payment_intent_id": payment_intent_id}
elif checkout_session_id:
query = {"stripe_checkout_session_id": checkout_session_id}
db = await self._db()
try:
insert_result = await db.payments.update_one(
query,
{"$setOnInsert": payment_doc},
upsert=True,
)
except DuplicateKeyError:
insert_result = None
if checkout_session_id:
await db.checkout_sessions.update_one(
{"session_id": checkout_session_id},
{
"$set": {
"status": "paid",
"amount_cents": amount_cents,
"stripe_payment_intent_id": payment_intent_id,
"paid_at": now_utc(),
}
},
)
await self._refresh_project_status(project["_id"])
if insert_result is None or insert_result.upserted_id is None:
return {"received": True, "duplicate": True}
return {"received": True, "payment_recorded": True}
async def _project_from_stripe_session(
self, session: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
metadata = as_plain_dict(session.get("metadata"))
project_id = (
metadata.get("project_id")
or metadata.get("projectId")
or session.get("client_reference_id")
)
share_token = metadata.get("share_token") or metadata.get("shareToken")
db = await self._db()
if project_id:
oid = object_id(str(project_id))
if oid is not None:
project = await db.projects.find_one({"_id": oid})
if project:
return project
if share_token:
project = await db.projects.find_one({"share_token": str(share_token)})
if project:
return project
checkout_session_id = session.get("id")
if checkout_session_id:
checkout_doc = await db.checkout_sessions.find_one(
{"session_id": str(checkout_session_id)}
)
if checkout_doc:
return await db.projects.find_one({"_id": checkout_doc["project_id"]})
return None
def _stripe_metadata(self, project: Dict[str, Any]) -> Dict[str, str]:
return {
"project_id": str(project["_id"]),
"project_number": str(project["project_number"]),
"share_token": str(project["share_token"]),
}
async def _create_checkout_session(self, project: Dict[str, Any]) -> Dict[str, Any]:
if not self.stripe_secret_key:
raise RuntimeError("STRIPE_SECRET_KEY is not configured")
if not self.stripe_product_id:
raise RuntimeError("STRIPE_PRODUCT_ID is not configured")
stripe.api_key = self.stripe_secret_key
summary = await self._summarize_project(project)
balance_cents = int(summary["balance_cents"])
if balance_cents <= 0:
raise RuntimeError("Project is already paid in full")
public_url = self._public_project_url(project)
metadata = self._stripe_metadata(project)
minimum_amount_cents = max(1, min(self.stripe_minimum_amount_cents, balance_cents))
price = await stripe.Price.create_async(
currency=self.currency.lower(),
product=self.stripe_product_id,
custom_unit_amount={
"enabled": True,
"minimum": minimum_amount_cents,
"maximum": balance_cents,
"preset": balance_cents,
},
metadata=metadata,
)
price_dict = as_plain_dict(price)
price_id = price_dict.get("id") or getattr(price, "id", None)
if not price_id:
raise RuntimeError("Stripe price did not include an id")
payload: Dict[str, Any] = {
"mode": "payment",
"line_items": [{"price": price_id, "quantity": 1}],
"success_url": f"{public_url}?checkout=return",
"cancel_url": f"{public_url}?checkout=cancel",
"client_reference_id": str(project["_id"]),
"metadata": metadata,
"payment_intent_data": {"metadata": metadata},
}
if project.get("customer_email"):
payload["customer_email"] = project["customer_email"]
response = await stripe.checkout.Session.create_async(**payload)
response_dict = as_plain_dict(response)
checkout_url = response_dict.get("url") or getattr(response, "url", None)
session_id = response_dict.get("id") or getattr(response, "id", None)
if not checkout_url:
raise RuntimeError("Stripe checkout session did not include a url")
return {
"checkout_url": checkout_url,
"session_id": session_id,
"stripe_price_id": price_id,
"balance_cents": balance_cents,
}
def _signed_admin_cookie(self) -> str:
signature = hmac.new(
self.admin_cookie_secret.encode("utf-8"),
b"admin",
"sha256",
).hexdigest()
return f"admin.{signature}"
def _cookie_header(self, max_age: int) -> str:
value = self._signed_admin_cookie() if max_age > 0 else ""
return (
f"{ADMIN_COOKIE}={value}; Path=/; Max-Age={max_age}; "
"SameSite=Lax; HttpOnly"
)
def _is_admin(self) -> bool:
cookie_header = self.request.headers.get("cookie", "")
cookies: Dict[str, str] = {}
for part in cookie_header.split(";"):
if "=" in part:
key, value = part.strip().split("=", 1)
cookies[key] = value
actual = cookies.get(ADMIN_COOKIE, "")
return hmac.compare_digest(actual, self._signed_admin_cookie())
def _require_admin(self):
if not self._is_admin():
return self._redirect("/login")
return None
def _base_url(self) -> str:
if self.app_base_url:
return self.app_base_url
headers = self.request.headers
proto = headers.get("x-forwarded-proto") or "http"
host = headers.get("x-forwarded-host") or headers.get("host") or "127.0.0.1:8000"
return f"{proto}://{host}".rstrip("/")
def _public_project_url(self, project: Dict[str, Any]) -> str:
return f"{self._base_url()}/p/{project['share_token']}"
async def _next_project_number(self) -> str:
db = await self._db()
counter = await db.counters.find_one_and_update(
{"_id": "project_number"},
{"$inc": {"value": 1}},
upsert=True,
return_document=ReturnDocument.AFTER,
)
return f"PRJ-{now_utc().year}-{int(counter['value']):04d}"
def _project_total(self, project: Dict[str, Any]) -> int:
return sum(int(item.get("amount_cents", 0)) for item in project.get("line_items", []))
async def _project_paid(self, project_id: ObjectId) -> int:
pipeline = [
{"$match": {"project_id": project_id, "status": "succeeded"}},
{"$group": {"_id": "$project_id", "total": {"$sum": "$amount_cents"}}},
]
db = await self._db()
cursor = await db.payments.aggregate(pipeline)
rows = await cursor.to_list(length=1)
return int(rows[0]["total"]) if rows else 0
async def _summarize_project(self, project: Dict[str, Any]) -> Dict[str, Any]:
total = self._project_total(project)
paid = await self._project_paid(project["_id"])
balance = max(total - paid, 0)
status = "paid" if total > 0 and balance == 0 else project.get("status", "open")
return {
**project,
"id": str(project["_id"]),
"scope_terms": project.get("scope_terms", ""),
"total_cents": total,
"paid_cents": paid,
"balance_cents": balance,
"status": status,
"share_url": self._public_project_url(project),
}
async def _find_projects(self, q: str, page: int) -> Tuple[List[Dict[str, Any]], bool]:
query: Dict[str, Any] = {}
q = (q or "").strip()
if q:
clauses: List[Dict[str, Any]] = [
{"project_number": {"$regex": q, "$options": "i"}},
{"status": {"$regex": q, "$options": "i"}},
{"customer_name": {"$regex": q, "$options": "i"}},
]
oid = object_id(q)
if oid is not None:
clauses.append({"_id": oid})
query = {"$or": clauses}
skip = max(page - 1, 0) * PAGE_SIZE
db = await self._db()
cursor = db.projects.find(query).sort("created_at", -1).skip(skip).limit(PAGE_SIZE + 1)
docs = await cursor.to_list(length=PAGE_SIZE + 1)
has_more = len(docs) > PAGE_SIZE
projects = [await self._summarize_project(doc) for doc in docs[:PAGE_SIZE]]
return projects, has_more
def _parse_line_items(self) -> Tuple[List[Dict[str, Any]], List[str]]:
names = self.request.body_params.get("item_name", [])
prices = self.request.body_params.get("item_price", [])
max_len = max(len(names), len(prices))
items: List[Dict[str, Any]] = []
errors: List[str] = []
for index in range(max_len):
name = names[index].strip() if index < len(names) else ""
price = prices[index].strip() if index < len(prices) else ""
if not name and not price:
continue
amount_cents = parse_money_to_cents(price)
if not name:
errors.append("Each line item needs a name.")
if amount_cents is None or amount_cents <= 0:
errors.append("Each line item needs a price greater than 0.")
if name or price:
items.append(
{
"name": name,
"amount_cents": amount_cents or 0,
"price_input": price,
}
)
if not items:
errors.append("Add at least one line item.")
return items, errors
def _project_from_form(self, existing: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
items, errors = self._parse_line_items()
customer_name = (self.request.form("customer_name", "") or "").strip()
customer_email = (self.request.form("customer_email", "") or "").strip()
scope_terms = (self.request.form("scope_terms", "") or "").strip()
if not customer_name:
errors.append("Customer name is required.")
if not items:
items = [{"name": "", "amount_cents": 0, "price_input": ""}]
clean_items = [
{"name": item["name"], "amount_cents": item["amount_cents"]}
for item in items
if item.get("name") and item.get("amount_cents", 0) > 0
]
project_items = items if errors else clean_items
project = {
"customer_name": customer_name,
"customer_email": customer_email,
"scope_terms": scope_terms,
"line_items": project_items,
"status": existing.get("status", "open") if existing else "open",
}
if existing:
project.update(
{
"_id": existing["_id"],
"id": str(existing["_id"]),
"project_number": existing["project_number"],
"share_token": existing["share_token"],
}
)
project["errors"] = sorted(set(errors))
return project
async def _refresh_project_status(self, project_id: ObjectId) -> None:
db = await self._db()
project = await db.projects.find_one({"_id": project_id})
if not project:
return
total = self._project_total(project)
paid = await self._project_paid(project_id)
status = "paid" if total > 0 and paid >= total else "open"
await db.projects.update_one(
{"_id": project_id},
{"$set": {"status": status, "updated_at": now_utc()}},
)
async def _render(self, template: str, **kwargs: Any) -> str:
kwargs.setdefault("admin", self._is_admin())
kwargs.setdefault("currency", self.currency)
kwargs.setdefault("public_logo_url", self.public_logo_url)
kwargs.setdefault("public_logo_alt", self.public_logo_alt)
return await self._render_template(template, **kwargs)
async def index(self) -> Any:
if redirect := self._require_admin():
return redirect
page = int(self.request.query("page", "1") or "1")
q = self.request.query("q", "") or ""
projects, has_more = await self._find_projects(q, page)
return await self._render(
"index.html",
projects=projects,
q=q,
page=page,
has_more=has_more,
)
async def projects_page(self) -> Any:
if redirect := self._require_admin():
return redirect
page = int(self.request.query("page", "1") or "1")
q = self.request.query("q", "") or ""
projects, has_more = await self._find_projects(q, page)
return await self._render(
"_project_rows.html",
projects=projects,
page=page,
has_more=has_more,
)
async def login(self) -> Any:
if self.request.method == "POST":
password = self.request.form("password", "") or ""
if hmac.compare_digest(password, self.admin_password):
return self._redirect(
"/",
[("Set-Cookie", self._cookie_header(30 * 24 * 60 * 60))],
)
return await self._render("login.html", error="Invalid password.")
if self._is_admin():
return self._redirect("/")
return await self._render("login.html")
def logout(self) -> Any:
return self._redirect("/login", [("Set-Cookie", self._cookie_header(0))])
async def new(self) -> Any:
if redirect := self._require_admin():
return redirect
project = {
"customer_name": "",
"customer_email": "",
"scope_terms": "",
"line_items": [{"name": "", "amount_cents": 0}],
"errors": [],
}
return await self._render("form.html", project=project, mode="new")
async def create(self) -> Any:
if redirect := self._require_admin():
return redirect
if self.request.method != "POST":
return self._redirect("/new")
project = self._project_from_form()
if project["errors"]:
return await self._render("form.html", project=project, mode="new")
now = now_utc()
project_number = await self._next_project_number()
project_doc = {
"project_number": project_number,
"share_token": secrets.token_urlsafe(24),
"customer_name": project["customer_name"],
"customer_email": project["customer_email"],
"scope_terms": project["scope_terms"],
"line_items": project["line_items"],
"status": "open",
"created_at": now,
"updated_at": now,
}
db = await self._db()
result = await db.projects.insert_one(project_doc)
return self._redirect(f"/project/{result.inserted_id}")
async def edit(self, project_id: str) -> Any:
if redirect := self._require_admin():
return redirect
oid = object_id(project_id)
db = await self._db()
project = await db.projects.find_one({"_id": oid}) if oid else None
if not project:
return 404, "Project not found"
if self.request.method == "POST":
updated = self._project_from_form(project)
if updated["errors"]:
return await self._render("form.html", project=updated, mode="edit")
await db.projects.update_one(
{"_id": project["_id"]},
{
"$set": {
"customer_name": updated["customer_name"],
"customer_email": updated["customer_email"],
"scope_terms": updated["scope_terms"],
"line_items": updated["line_items"],
"updated_at": now_utc(),
}
},
)
await self._refresh_project_status(project["_id"])
return self._redirect(f"/project/{project['_id']}")
project = await self._summarize_project(project)
project["errors"] = []
return await self._render("form.html", project=project, mode="edit")
async def project(self, project_id: str) -> Any:
if redirect := self._require_admin():
return redirect
oid = object_id(project_id)
db = await self._db()
project = await db.projects.find_one({"_id": oid}) if oid else None
if not project:
return 404, "Project not found"
summary = await self._summarize_project(project)
payments = await db.payments.find({"project_id": project["_id"]}).sort(
"created_at", -1
).to_list(length=None)
for payment in payments:
payment["id"] = str(payment["_id"])
return await self._render(
"detail.html",
project=summary,
payments=payments,
)
async def delete(self, project_id: str) -> Any:
if redirect := self._require_admin():
return redirect
oid = object_id(project_id)
db = await self._db()
project = await db.projects.find_one({"_id": oid}) if oid else None
if not project:
return 404, "Project not found"
if self.request.method != "POST":
return self._redirect(f"/project/{project_id}")
payment_docs = await db.payments.find(
{"project_id": project["_id"], "webhook_id": {"$exists": True}},
{"webhook_id": 1},
).to_list(length=None)
payment_webhook_ids = [
payment["webhook_id"] for payment in payment_docs if payment.get("webhook_id")
]
if payment_webhook_ids:
await db.webhook_events.delete_many(
{"webhook_id": {"$in": payment_webhook_ids}}
)
await db.checkout_sessions.delete_many({"project_id": project["_id"]})
await db.payments.delete_many({"project_id": project["_id"]})
await db.projects.delete_one({"_id": project["_id"]})
return self._redirect("/")
async def p(self, share_token: str) -> Any:
db = await self._db()
project = await db.projects.find_one({"share_token": share_token})
if not project:
return 404, "Project not found"
summary = await self._summarize_project(project)
checkout_state = self.request.query("checkout", "") or ""
return await self._render(
"public_project.html",
project=summary,
checkout_state=checkout_state,
)
async def pay(self, share_token: str) -> Any:
db = await self._db()
project = await db.projects.find_one({"share_token": share_token})
if not project:
return 404, "Project not found"
summary = await self._summarize_project(project)
if self.request.method != "POST":
return self._redirect(f"/p/{share_token}")
if summary["balance_cents"] <= 0:
return self._redirect(f"/p/{share_token}")
try:
checkout = await self._create_checkout_session(project)
except Exception as exc:
return await self._render(
"public_project.html",
project=summary,
error=str(exc),
checkout_state="",
)
checkout_doc = {
"project_id": project["_id"],
"provider": "stripe",
"currency": self.currency,
"status": "created",
"created_at": now_utc(),
}
if checkout.get("session_id"):
checkout_doc["session_id"] = checkout["session_id"]
if checkout.get("stripe_price_id"):
checkout_doc["stripe_price_id"] = checkout["stripe_price_id"]
if checkout.get("balance_cents"):
checkout_doc["balance_cents_at_creation"] = checkout["balance_cents"]
await db.checkout_sessions.insert_one(checkout_doc)
return self._redirect(checkout["checkout_url"])
load_dotenv()
app = InvoiceApp()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host=os.getenv("HOST", "127.0.0.1"),
port=int(os.getenv("PORT", "8000")),
)