patx/micropie
improved url_shortener example, added middleware to demo how to use mongo and micropie for rate limitting
Commit 271cb4a · patx · 2025-12-28T00:56:04-05:00
Comments
No comments yet.
Diff
diff --git a/examples/url_shortener/app.py b/examples/url_shortener/app.py
deleted file mode 100644
index e5cd501..0000000
--- a/examples/url_shortener/app.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# shorty_index_only.py
-# URL shortener on MicroPie using ONLY the `index` method for both create & redirect.
-# No middleware. Path params are mapped to `index(id=...)` automatically by MicroPie.
-# Run: uvicorn shorty_index_only:app --reload
-
-import re
-import secrets
-import string
-from typing import Optional
-from urllib.parse import urlparse
-
-from micropie import App
-from pickledb import PickleDB
-
-# ---- storage ----
-db = PickleDB("shorty.json") # persisted JSON file
-
-# ---- config ----
-ALPHABET = string.ascii_letters + string.digits
-SLUG_LEN = 6
-
-# ---- helpers ----
-def _rand_slug(n: int = SLUG_LEN) -> str:
- return "".join(secrets.choice(ALPHABET) for _ in range(n))
-
-def _is_valid_url(url: str) -> bool:
- """Basic URL validation."""
- if not url:
- return False
- # Simple regex for URL validation (http(s) scheme, domain, optional path)
- pattern = r'^https?://[a-zA-Z0-9-._~:/?#[\]@!$&\'()*+,;=]+[^/]$'
- return bool(re.match(pattern, url))
-
-class Root(App):
-
- async def index(self, id: Optional[str] = None):
- if self.request.method == "POST":
- # Use body_params for form data instead of query_params
- url = self.request.body_params.get('url', [''])[0]
- if not _is_valid_url(url):
- return 400, "Invalid URL provided"
- slug = _rand_slug()
- while db.get(slug):
- slug = _rand_slug()
- db.set(slug, url)
- db.save()
- short = f"http://127.0.0.1:8000/{slug}"
- return f"Your short URL is <a href='{short}'>{short}</a>"
- elif self.request.method == "GET":
- if id:
- dest = db.get(id)
- if not dest:
- return 404, "Not Found"
- # Ensure destination URL is valid before redirecting
- if not _is_valid_url(dest):
- return 400, "Invalid destination URL"
- return self._redirect(dest)
- else:
- return (
- """
- <h1>Shorty</h1>
- <form method="post" enctype="application/x-www-form-urlencoded">
- <input type="url" name="url" placeholder="https://example.com" required style="width:28rem">
- <button type="submit">Shorten</button>
- </form>
- """
- )
-
-app = Root()
diff --git a/examples/url_shortener/main.py b/examples/url_shortener/main.py
new file mode 100644
index 0000000..9f1409f
--- /dev/null
+++ b/examples/url_shortener/main.py
@@ -0,0 +1,49 @@
+from string import ascii_letters, digits
+from secrets import choice
+
+from micropie import App
+from mongokv import Mkv
+
+from middlewares.rate_limit import MongoRateLimitMiddleware
+
+
+URL_ROOT = "http://localhost:8000/"
+MONGO_URI = "mongodb://localhost:27017"
+
+db = Mkv(MONGO_URI)
+
+
+class Shorty(App):
+
+ def _generate_id(self, length: int = 8) -> str:
+ return "".join(choice(ascii_letters + digits) for _ in range(length))
+
+ async def index(self, url_str: str | None = None):
+ if url_str:
+ if self.request.method == "POST":
+ while True:
+ short_id = self._generate_id()
+ try:
+ await db.get(short_id)
+ except KeyError:
+ break
+
+ await db.set(short_id, url_str)
+ return await self._render_template(
+ "success.html",
+ url_id=url_str,
+ short_id=short_id,
+ url_root=URL_ROOT,
+ )
+
+ real_url = await db.get(url_str, "/")
+ if not isinstance(real_url, str) or not real_url.startswith(("http://", "https://")):
+ return self._redirect("/")
+ return self._redirect(real_url)
+
+ return await self._render_template("index.html")
+
+
+app = Shorty()
+app.middlewares.append(MongoRateLimitMiddleware(mongo_uri=MONGO_URI))
+
diff --git a/examples/url_shortener/middlewares/__init__.py b/examples/url_shortener/middlewares/__init__.py
new file mode 100644
index 0000000..6a0ca3c
--- /dev/null
+++ b/examples/url_shortener/middlewares/__init__.py
@@ -0,0 +1,3 @@
+from .rate_limit import MongoRateLimitMiddleware
+
+__all__ = ["MongoRateLimitMiddleware"]
diff --git a/examples/url_shortener/middlewares/rate_limit.py b/examples/url_shortener/middlewares/rate_limit.py
new file mode 100644
index 0000000..36bbd43
--- /dev/null
+++ b/examples/url_shortener/middlewares/rate_limit.py
@@ -0,0 +1,125 @@
+from __future__ import annotations
+
+from datetime import datetime, timedelta
+
+from motor.motor_asyncio import AsyncIOMotorClient
+from micropie import HttpMiddleware
+
+
+class MongoRateLimitMiddleware(HttpMiddleware):
+ """
+ Global MongoDB-based rate limiter.
+ """
+
+ MAX_REQUESTS = 50
+ WINDOW_SECONDS = 60
+
+ BLOCK_AFTER_VIOLATIONS = 3
+ BLOCK_FOR_SECONDS = 900
+
+ PERMA_WINDOW_HOURS = 24
+ PERMA_BLOCK_AFTER = 10
+
+ def __init__(
+ self,
+ mongo_uri: str,
+ db_name: str = "rate_limit",
+ collection_name: str = "list",
+ ):
+ self.client = AsyncIOMotorClient(mongo_uri)
+ self.db = self.client[db_name]
+ self.collection = self.db[collection_name]
+
+ async def before_request(self, request):
+ client = request.scope.get("client") or ("unknown", 0)
+ client_ip = client[0] or "unknown"
+ now = datetime.utcnow()
+
+ window_start_cutoff = now - timedelta(seconds=self.WINDOW_SECONDS)
+ perma_window_cutoff = now - timedelta(hours=self.PERMA_WINDOW_HOURS)
+
+ key = client_ip
+ doc = await self.collection.find_one({"_id": key})
+
+ if doc and doc.get("permanent_blocked"):
+ return {
+ "status_code": 403,
+ "body": f"Access permanently blocked for IP {client_ip}.",
+ "headers": [],
+ }
+
+ if doc:
+ blocked_until = doc.get("blocked_until")
+ if isinstance(blocked_until, datetime) and now < blocked_until:
+ retry_after = max(0, int((blocked_until - now).total_seconds()))
+ return {
+ "status_code": 429,
+ "body": f"Too many requests from {client_ip}. Temporarily blocked.",
+ "headers": [("Retry-After", str(retry_after))],
+ }
+
+ if (
+ not doc
+ or doc.get("window_start") is None
+ or doc["window_start"] < window_start_cutoff
+ ):
+ violations = doc.get("violations", 0) if doc else 0
+
+ await self.collection.replace_one(
+ {"_id": key},
+ {
+ "_id": key,
+ "ip": client_ip,
+ "count": 1,
+ "window_start": now,
+ "violations": violations,
+ "blocked_until": None,
+ "permanent_blocked": doc.get("permanent_blocked", False) if doc else False,
+ "permanent_blocked_at": doc.get("permanent_blocked_at") if doc else None,
+ "violation_events": doc.get("violation_events", []) if doc else [],
+ },
+ upsert=True,
+ )
+ return None
+
+ count = int(doc.get("count", 0))
+
+ if count >= self.MAX_REQUESTS:
+ violations = int(doc.get("violations", 0)) + 1
+
+ await self.collection.update_one(
+ {"_id": key},
+ {
+ "$set": {"violations": violations},
+ "$push": {"violation_events": now},
+ "$pull": {"violation_events": {"$lt": perma_window_cutoff}},
+ },
+ )
+
+ doc = await self.collection.find_one({"_id": key})
+ events_last_24h = len((doc or {}).get("violation_events", []))
+
+ update = {}
+
+ if violations >= self.BLOCK_AFTER_VIOLATIONS:
+ update["blocked_until"] = now + timedelta(seconds=self.BLOCK_FOR_SECONDS)
+
+ if events_last_24h >= self.PERMA_BLOCK_AFTER:
+ update["permanent_blocked"] = True
+ update["permanent_blocked_at"] = now
+
+ if update:
+ await self.collection.update_one({"_id": key}, {"$set": update})
+
+ return {
+ "status_code": 429,
+ "body": f"Rate limit exceeded for IP {client_ip}.",
+ "headers": [],
+ }
+
+ await self.collection.update_one({"_id": key}, {"$inc": {"count": 1}})
+ return None
+
+ async def after_request(self, request, status_code, response_body, extra_headers):
+ return None
+
diff --git a/examples/url_shortener/shorty/main.py b/examples/url_shortener/shorty/main.py
deleted file mode 100644
index bffbd04..0000000
--- a/examples/url_shortener/shorty/main.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from string import ascii_letters
-from secrets import choice
-
-from micropie import App
-from mongokv import Mkv
-
-URLROOT = "http://localhost:8000/"
-db = Mkv("mongodb://localhost:27017")
-
-
-class Shorty(App):
-
- async def index(self, url_str: str | None = None):
- if url_str:
-
- if self.request.method == "POST":
- short_id = "".join(choice(ascii_letters) for _ in range(6))
- await db.set(short_id, url_str)
- return await self._render_template("success.html",
- url_id=url_str, short_id=short_id, url_root=URLROOT)
-
- real_url = await db.get(url_str, "/")
- return self._redirect(real_url)
-
- return await self._render_template("index.html")
-
-
-app = Shorty()
diff --git a/examples/url_shortener/shorty/templates/index.html b/examples/url_shortener/templates/index.html
similarity index 100%
rename from examples/url_shortener/shorty/templates/index.html
rename to examples/url_shortener/templates/index.html
diff --git a/examples/url_shortener/shorty/templates/success.html b/examples/url_shortener/templates/success.html
similarity index 96%
rename from examples/url_shortener/shorty/templates/success.html
rename to examples/url_shortener/templates/success.html
index 1f9fb4f..393c336 100644
--- a/examples/url_shortener/shorty/templates/success.html
+++ b/examples/url_shortener/templates/success.html
@@ -79,7 +79,7 @@ button:hover{
<div class="url-box">
<div class="short-url" id="shortUrl">{{ url_root }}{{ short_id }}</div>
</div>
- <button onclick="copyUrl()">Copy Link</button>
+ <button class="copy-btn" onclick="copyUrl()">Copy Link</button>
<br>
<a href="/" class="back-link">← Shorten another</a>
</div>