patx/micropie
Increase framework work speed, by caching handler metadata to avoid repeated inspect.signature / coroutine checks.
Commit e88504d · patx · 2026-05-11T00:46:34-04:00
Increase framework work speed, by caching handler metadata to avoid repeated inspect.signature / coroutine checks. Skipped empty query parsing and session backend loads when no session cookie exists. Optimized request body buffering and JSON parsing. Centralized response header preparation with fast paths for default and JSON headers. Amortized in-memory session cleanup. Added regression coverage in tests.py for session fast paths, cached handler replacement, HTTP binding, and WebSocket binding.
Comments
No comments yet.
Diff
diff --git a/micropie.py b/micropie.py
index 01cb3ad..add903d 100644
--- a/micropie.py
+++ b/micropie.py
@@ -18,7 +18,7 @@ import time
import traceback
import uuid
from abc import ABC, abstractmethod
-from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
+from typing import Any, Awaitable, Callable, Dict, List, NamedTuple, Optional, Tuple
from urllib.parse import parse_qs, urlsplit, urlunsplit, quote
try:
@@ -41,6 +41,33 @@ except ImportError:
MULTIPART_INSTALLED = False
+_PARAM_EMPTY = inspect.Parameter.empty
+_VAR_POSITIONAL = inspect.Parameter.VAR_POSITIONAL
+_POSITIONAL_PARAM_KINDS = (
+ inspect.Parameter.POSITIONAL_ONLY,
+ inspect.Parameter.POSITIONAL_OR_KEYWORD,
+ inspect.Parameter.VAR_POSITIONAL,
+)
+_DEFAULT_CONTENT_TYPE = ("Content-Type", "text/html; charset=utf-8")
+_JSON_CONTENT_TYPE = ("Content-Type", "application/json")
+_DEFAULT_HEADER_BYTES = (b"Content-Type", b"text/html; charset=utf-8")
+_JSON_HEADER_BYTES = (b"Content-Type", b"application/json")
+_DEFAULT_HEADERS_BYTES = [_DEFAULT_HEADER_BYTES]
+_JSON_HEADERS_BYTES = [_JSON_HEADER_BYTES]
+
+
+class _HandlerParam(NamedTuple):
+ name: str
+ kind: Any
+ default: Any
+
+
+class _HandlerInfo(NamedTuple):
+ params: Tuple[_HandlerParam, ...]
+ accepts_params: bool
+ is_coroutine: bool
+
+
# -----------------------------
# Session Backend Abstraction
# -----------------------------
@@ -75,10 +102,16 @@ class InMemorySessionBackend(SessionBackend):
def __init__(self):
self.sessions: Dict[str, Dict[str, Any]] = {}
self.last_access: Dict[str, float] = {}
+ self._next_cleanup: float = 0.0
+ self._cleanup_interval: float = 60.0
- def _cleanup(self):
+ def _cleanup(self, now: Optional[float] = None, *, force: bool = False):
"""Remove expired sessions based on SESSION_TIMEOUT."""
- now = time.time()
+ if now is None:
+ now = time.time()
+ if not force and now < self._next_cleanup:
+ return
+ self._next_cleanup = now + self._cleanup_interval
expired = [
sid for sid, ts in self.last_access.items() if now - ts >= SESSION_TIMEOUT
]
@@ -87,11 +120,16 @@ class InMemorySessionBackend(SessionBackend):
self.last_access.pop(sid, None)
async def load(self, session_id: str) -> Dict[str, Any]:
- self._cleanup()
now = time.time()
- if session_id in self.sessions:
+ self._cleanup(now)
+ last_access = self.last_access.get(session_id)
+ if last_access is not None:
+ if now - last_access >= SESSION_TIMEOUT:
+ self.sessions.pop(session_id, None)
+ self.last_access.pop(session_id, None)
+ return {}
self.last_access[session_id] = now
- return self.sessions[session_id]
+ return self.sessions.get(session_id, {})
return {}
async def save(self, session_id: str, data: Dict[str, Any], timeout: int) -> None:
@@ -430,6 +468,7 @@ class App:
self.ws_middlewares: List[WebSocketMiddleware] = []
self.startup_handlers: List[Callable[[], Awaitable[None]]] = []
self.shutdown_handlers: List[Callable[[], Awaitable[None]]] = []
+ self._handler_cache: Dict[Any, _HandlerInfo] = {}
self._started: bool = False
@property
@@ -441,6 +480,49 @@ class App:
"""
return current_request.get()
+ def _get_handler_info(self, handler: Callable[..., Any]) -> _HandlerInfo:
+ """
+ Return cached metadata needed to bind and call a route handler.
+ """
+ cache_key = getattr(handler, "__func__", handler)
+ handler_info = self._handler_cache.get(cache_key)
+ if handler_info is not None:
+ return handler_info
+
+ params = []
+ accepts_params = False
+ for param in inspect.signature(handler).parameters.values():
+ if param.name == "self":
+ continue
+ if param.kind in _POSITIONAL_PARAM_KINDS:
+ accepts_params = True
+ params.append(_HandlerParam(param.name, param.kind, param.default))
+
+ handler_info = _HandlerInfo(
+ tuple(params), accepts_params, inspect.iscoroutinefunction(handler)
+ )
+ self._handler_cache[cache_key] = handler_info
+ return handler_info
+
+ async def _load_session_from_scope(
+ self, scope: Dict[str, Any], cookies: Dict[str, str]
+ ) -> Dict[str, Any]:
+ """
+ Return an existing ASGI session or load one when a session cookie exists.
+ """
+ if "session" in scope:
+ return scope["session"]
+ session_id = cookies.get("session_id")
+ if not session_id:
+ return {}
+ return await self.session_backend.load(session_id) or {}
+
+ def _parse_query_string(self, scope: Dict[str, Any]) -> Dict[str, List[str]]:
+ query_string = scope.get("query_string", b"")
+ if not query_string:
+ return {}
+ return parse_qs(query_string.decode("utf-8", "ignore"))
+
async def __call__(
self,
scope: Dict[str, Any],
@@ -550,14 +632,10 @@ class App:
try:
# Parse query/cookies/session
- request.query_params = parse_qs(
- scope.get("query_string", b"").decode("utf-8", "ignore")
- )
- cookies = self._parse_cookies(request.headers.get("cookie", ""))
- request.session = scope.get(
- "session",
- await self.session_backend.load(cookies.get("session_id", "")) or {},
- )
+ request.query_params = self._parse_query_string(scope)
+ cookie_header = request.headers.get("cookie", "")
+ cookies = self._parse_cookies(cookie_header) if cookie_header else {}
+ request.session = await self._load_session_from_scope(scope, cookies)
content_type = request.headers.get("content-type", "")
# Body parsing setup
@@ -585,13 +663,13 @@ class App:
)
)
else:
- body_data = bytearray()
+ body_chunks: List[bytes] = []
try:
async with asyncio.timeout(5): # Timeout after 5 seconds
while True:
msg = await receive()
if chunk := msg.get("body", b""):
- body_data += chunk
+ body_chunks.append(chunk)
if not msg.get("more_body"):
break
except asyncio.TimeoutError:
@@ -599,10 +677,13 @@ class App:
408, "408 Request Timeout: Failed to receive body"
)
return
- decoded_body = body_data.decode("utf-8", "ignore")
+ if len(body_chunks) == 1:
+ body_data = body_chunks[0]
+ else:
+ body_data = b"".join(body_chunks)
if "application/json" in content_type:
try:
- request.get_json = json.loads(decoded_body)
+ request.get_json = json.loads(body_data)
if isinstance(request.get_json, dict):
request.body_params = {
k: [str(v)] for k, v in request.get_json.items()
@@ -611,7 +692,9 @@ class App:
await _early_exit(400, "400 Bad Request: Bad JSON")
return
else:
- request.body_params = parse_qs(decoded_body)
+ request.body_params = parse_qs(
+ body_data.decode("utf-8", "ignore")
+ )
request.body_parsed = True
# HTTP middlewares (before)
@@ -667,53 +750,48 @@ class App:
if not handler:
await _early_exit(404, "404 Not Found")
return
+ handler_info = self._get_handler_info(handler)
# Initialize func_args early to avoid UnboundLocalError
func_args: List[Any] = []
# Check if index handler accepts parameters (for non-root paths)
if handler == getattr(self, "index", None) and path and path != "index":
- sig = inspect.signature(handler)
- accepts_params = any(
- param.kind
- in (
- inspect.Parameter.POSITIONAL_ONLY,
- inspect.Parameter.POSITIONAL_OR_KEYWORD,
- inspect.Parameter.VAR_POSITIONAL,
- )
- for param in sig.parameters.values()
- if param.name != "self"
- )
- if not accepts_params:
+ if not handler_info.accepts_params:
await _early_exit(404, "404 Not Found")
return
request.path_params = parts # Pass all path parts to index handler
# Build handler args (query/body/files/session)
- sig = inspect.signature(handler)
- path_params_copy = request.path_params[:]
-
- for param in sig.parameters.values():
- if param.name == "self":
- continue
- if param.kind == inspect.Parameter.VAR_POSITIONAL:
- func_args.extend(path_params_copy)
- path_params_copy = []
+ path_params = request.path_params
+ path_param_index = 0
+ path_param_count = len(path_params)
+ is_multipart = "multipart/form-data" in content_type
+ query_params = request.query_params
+ body_params = request.body_params
+ files = request.files
+ session = request.session
+
+ for param in handler_info.params:
+ if param.kind == _VAR_POSITIONAL:
+ func_args.extend(path_params[path_param_index:])
+ path_param_index = path_param_count
continue
param_value = None
- if path_params_copy:
- param_value = path_params_copy.pop(0)
- elif param.name in request.query_params:
- param_value = request.query_params[param.name][0]
- elif param.name in request.body_params:
- param_value = request.body_params[param.name][0]
- elif param.name in request.files:
- param_value = request.files[param.name]
- elif "multipart/form-data" in content_type:
+ if path_param_index < path_param_count:
+ param_value = path_params[path_param_index]
+ path_param_index += 1
+ elif param.name in query_params:
+ param_value = query_params[param.name][0]
+ elif param.name in body_params:
+ param_value = body_params[param.name][0]
+ elif param.name in files:
+ param_value = files[param.name]
+ elif is_multipart:
param_value = await _await_file_param(param.name)
- if param_value is None and param.default is param.empty:
+ if param_value is None and param.default is _PARAM_EMPTY:
await _early_exit(
400,
f"400 Bad Request: Missing required parameter '{param.name}'",
@@ -721,9 +799,9 @@ class App:
return
if param_value is None:
param_value = param.default
- elif param.name in request.session:
- param_value = request.session[param.name]
- elif param.default is not param.empty:
+ elif param.name in session:
+ param_value = session[param.name]
+ elif param.default is not _PARAM_EMPTY:
param_value = param.default
else:
await _early_exit(
@@ -738,7 +816,7 @@ class App:
try:
result = (
await handler(*func_args)
- if inspect.iscoroutinefunction(handler)
+ if handler_info.is_coroutine
else handler(*func_args)
)
except Exception:
@@ -797,24 +875,11 @@ class App:
# Handle async generators (e.g., SSE)
if hasattr(response_body, "__aiter__"):
- sanitized_headers: List[Tuple[str, str]] = []
- for k, v in extra_headers:
- if "\n" in k or "\r" in k or "\n" in v or "\r" in v:
- print(f"Header injection attempt detected: {k}: {v}")
- continue
- sanitized_headers.append((k, v))
- if not any(h[0].lower() == "content-type" for h in sanitized_headers):
- sanitized_headers.append(
- ("Content-Type", "text/html; charset=utf-8")
- )
await send(
{
"type": "http.response.start",
"status": status_code,
- "headers": [
- (k.encode("latin-1"), v.encode("latin-1"))
- for k, v in sanitized_headers
- ],
+ "headers": self._prepare_response_headers(extra_headers),
}
)
@@ -898,13 +963,10 @@ class App:
token = current_request.set(request)
try:
# Parse request details (query params, cookies, session)
- request.query_params = parse_qs(
- scope.get("query_string", b"").decode("utf-8", "ignore")
- )
- cookies = self._parse_cookies(request.headers.get("cookie", ""))
- request.session = (
- await self.session_backend.load(cookies.get("session_id", "")) or {}
- )
+ request.query_params = self._parse_query_string(scope)
+ cookie_header = request.headers.get("cookie", "")
+ cookies = self._parse_cookies(cookie_header) if cookie_header else {}
+ request.session = await self._load_session_from_scope(scope, cookies)
# Run WebSocket middleware before_websocket
for mw in self.ws_middlewares:
@@ -937,28 +999,33 @@ class App:
send, 1008, "No matching WebSocket route"
)
return
+ handler_info = self._get_handler_info(handler)
# Build function arguments
- sig = inspect.signature(handler)
func_args: List[Any] = []
- path_params_copy = request.path_params[:]
+ path_params = request.path_params
+ path_param_index = 0
+ path_param_count = len(path_params)
+ query_params = request.query_params
+ session = request.session
ws = WebSocket(receive, send)
func_args.append(ws) # First non-self parameter is WebSocket object
- for param in sig.parameters.values():
- if param.name in ("self", "ws"): # Skip self and ws parameters
+ for param in handler_info.params:
+ if param.name == "ws": # Skip the WebSocket parameter
continue
- if param.kind == inspect.Parameter.VAR_POSITIONAL:
- func_args.extend(path_params_copy)
- path_params_copy = []
+ if param.kind == _VAR_POSITIONAL:
+ func_args.extend(path_params[path_param_index:])
+ path_param_index = path_param_count
continue
param_value = None
- if path_params_copy:
- param_value = path_params_copy.pop(0)
- elif param.name in request.query_params:
- param_value = request.query_params[param.name][0]
- elif param.name in request.session:
- param_value = request.session[param.name]
- elif param.default is not param.empty:
+ if path_param_index < path_param_count:
+ param_value = path_params[path_param_index]
+ path_param_index += 1
+ elif param.name in query_params:
+ param_value = query_params[param.name][0]
+ elif param.name in session:
+ param_value = session[param.name]
+ elif param.default is not _PARAM_EMPTY:
param_value = param.default
else:
await self._send_websocket_close(
@@ -968,8 +1035,9 @@ class App:
func_args.append(param_value)
# Set session ID if needed
- session_id = cookies.get("session_id") or str(uuid.uuid4())
- ws.session_id = session_id
+ session_id = cookies.get("session_id")
+ had_session_id = bool(session_id)
+ ws.session_id = session_id or str(uuid.uuid4())
# Execute handler
try:
@@ -990,7 +1058,7 @@ class App:
await self.session_backend.save(
ws.session_id, request.session, SESSION_TIMEOUT
)
- else:
+ elif had_session_id:
# Treat empty session as logout/delete
await self.session_backend.save(ws.session_id, {}, 0)
@@ -1105,6 +1173,30 @@ class App:
if current_queue:
await current_queue.put(None)
+ def _prepare_response_headers(
+ self, extra_headers: Optional[List[Tuple[str, str]]] = None
+ ) -> List[Tuple[bytes, bytes]]:
+ if not extra_headers:
+ return _DEFAULT_HEADERS_BYTES
+ if len(extra_headers) == 1:
+ if extra_headers[0] == _DEFAULT_CONTENT_TYPE:
+ return _DEFAULT_HEADERS_BYTES
+ if extra_headers[0] == _JSON_CONTENT_TYPE:
+ return _JSON_HEADERS_BYTES
+
+ sanitized_headers: List[Tuple[bytes, bytes]] = []
+ has_content_type = False
+ for k, v in extra_headers:
+ if "\n" in k or "\r" in k or "\n" in v or "\r" in v:
+ print(f"Header injection attempt detected: {k}: {v}")
+ continue
+ if k.lower() == "content-type":
+ has_content_type = True
+ sanitized_headers.append((k.encode("latin-1"), v.encode("latin-1")))
+ if not has_content_type:
+ sanitized_headers.append(_DEFAULT_HEADER_BYTES)
+ return sanitized_headers
+
async def _send_response(
self,
send: Callable[[Dict[str, Any]], Awaitable[None]],
@@ -1122,24 +1214,11 @@ class App:
generator.
extra_headers: Optional list of extra header tuples.
"""
- if extra_headers is None:
- extra_headers = []
- sanitized_headers: List[Tuple[str, str]] = []
- for k, v in extra_headers:
- if "\n" in k or "\r" in k or "\n" in v or "\r" in v:
- print(f"Header injection attempt detected: {k}: {v}")
- continue
- sanitized_headers.append((k, v))
- if not any(h[0].lower() == "content-type" for h in sanitized_headers):
- sanitized_headers.append(("Content-Type", "text/html; charset=utf-8"))
await send(
{
"type": "http.response.start",
"status": status_code,
- "headers": [
- (k.encode("latin-1"), v.encode("latin-1"))
- for k, v in sanitized_headers
- ],
+ "headers": self._prepare_response_headers(extra_headers),
}
)
# Handle async generators (non-SSE cases; SSE is handled in _asgi_app_http)
diff --git a/tests.py b/tests.py
index f1d815d..a03ac12 100644
--- a/tests.py
+++ b/tests.py
@@ -1,4 +1,5 @@
import asyncio
+import json as std_json
import unittest
import uuid
from unittest.mock import AsyncMock, patch
@@ -14,6 +15,21 @@ from micropie import (
)
+class CountingSessionBackend(InMemorySessionBackend):
+ def __init__(self):
+ super().__init__()
+ self.load_calls = []
+ self.save_calls = []
+
+ async def load(self, session_id):
+ self.load_calls.append(session_id)
+ return await super().load(session_id)
+
+ async def save(self, session_id, data, timeout):
+ self.save_calls.append((session_id, dict(data), timeout))
+ await super().save(session_id, data, timeout)
+
+
class MicroPieTestCase(unittest.IsolatedAsyncioTestCase):
"""Base test case for MicroPie tests with common setup."""
@@ -211,6 +227,167 @@ class TestSession(MicroPieTestCase):
self.assertEqual(set_cookie_call["status"], 200, "Status should be 200")
+class TestFastPaths(MicroPieTestCase):
+ """Tests for optimized dispatch/session paths."""
+
+ async def test_request_without_cookie_skips_session_load(self):
+ """No session cookie should avoid a backend load call."""
+ backend = CountingSessionBackend()
+ self.app = App(session_backend=backend)
+
+ async def index(self):
+ return "OK"
+
+ setattr(self.app, "index", index.__get__(self.app, App))
+
+ scope = self.create_mock_scope(path="/index")
+ receive = AsyncMock(
+ return_value={"type": "http.request", "body": b"", "more_body": False}
+ )
+ send = AsyncMock()
+
+ await self.app(scope, receive, send)
+
+ self.assertEqual(backend.load_calls, [])
+ self.assertEqual(backend.save_calls, [])
+
+ async def test_request_with_session_cookie_loads_and_persists(self):
+ """A session cookie should still load and save the matching session."""
+ backend = CountingSessionBackend()
+ await backend.save("abc", {"user": "alice"}, SESSION_TIMEOUT)
+ backend.load_calls.clear()
+ backend.save_calls.clear()
+ self.app = App(session_backend=backend)
+
+ async def index(self):
+ self.request.session["seen"] = "1"
+ return self.request.session["user"]
+
+ setattr(self.app, "index", index.__get__(self.app, App))
+
+ scope = self.create_mock_scope(
+ path="/index", headers=[(b"cookie", b"session_id=abc")]
+ )
+ receive = AsyncMock(
+ return_value={"type": "http.request", "body": b"", "more_body": False}
+ )
+ send = AsyncMock()
+
+ await self.app(scope, receive, send)
+
+ self.assertEqual(backend.load_calls, ["abc"])
+ self.assertEqual(backend.save_calls[-1][0], "abc")
+ self.assertEqual(backend.sessions["abc"], {"user": "alice", "seen": "1"})
+ send.assert_any_call(
+ {"type": "http.response.body", "body": b"alice", "more_body": False}
+ )
+
+ async def test_cached_handler_metadata_tracks_replaced_handler(self):
+ """Replacing a handler should not reuse stale cached metadata."""
+
+ async def greet(self):
+ return "first"
+
+ setattr(self.app, "greet", greet.__get__(self.app, App))
+
+ scope = self.create_mock_scope(path="/greet")
+ receive = AsyncMock(
+ return_value={"type": "http.request", "body": b"", "more_body": False}
+ )
+ send = AsyncMock()
+ await self.app(scope, receive, send)
+ send.assert_any_call(
+ {"type": "http.response.body", "body": b"first", "more_body": False}
+ )
+
+ async def greet(self, name):
+ return f"second {name}"
+
+ setattr(self.app, "greet", greet.__get__(self.app, App))
+
+ scope = self.create_mock_scope(path="/greet/Alice")
+ receive = AsyncMock(
+ return_value={"type": "http.request", "body": b"", "more_body": False}
+ )
+ send = AsyncMock()
+ await self.app(scope, receive, send)
+ send.assert_any_call(
+ {
+ "type": "http.response.body",
+ "body": b"second Alice",
+ "more_body": False,
+ }
+ )
+
+ async def test_http_argument_binding_fast_path_matches_existing_sources(self):
+ """Path, query, body, session, and defaults should still bind in order."""
+
+ async def combine(self, path_value, query_value, body_value, user, suffix="ok"):
+ return {
+ "path": path_value,
+ "query": query_value,
+ "body": body_value,
+ "user": user,
+ "suffix": suffix,
+ }
+
+ setattr(self.app, "combine", combine.__get__(self.app, App))
+
+ scope = self.create_mock_scope(
+ path="/combine/path-part",
+ method="POST",
+ headers=[(b"content-type", b"application/x-www-form-urlencoded")],
+ query_string=b"query_value=query-part",
+ )
+ scope["session"] = {"user": "alice"}
+ receive = AsyncMock(
+ return_value={
+ "type": "http.request",
+ "body": b"body_value=body-part",
+ "more_body": False,
+ }
+ )
+ send = AsyncMock()
+
+ await self.app(scope, receive, send)
+
+ body_call = next(
+ call[0][0]
+ for call in send.call_args_list
+ if call[0][0]["type"] == "http.response.body"
+ )
+ self.assertEqual(
+ std_json.loads(body_call["body"]),
+ {
+ "path": "path-part",
+ "query": "query-part",
+ "body": "body-part",
+ "user": "alice",
+ "suffix": "ok",
+ },
+ )
+
+ async def test_websocket_argument_binding_uses_cached_metadata(self):
+ """WebSocket path and query argument binding should be unchanged."""
+
+ async def ws_chat(self, ws, room, user="guest"):
+ await ws.accept()
+ await ws.send_text(f"{room}:{user}")
+ await ws.close()
+
+ setattr(self.app, "ws_chat", ws_chat.__get__(self.app, App))
+
+ scope = self.create_mock_scope(
+ path="/chat/lobby", query_string=b"user=alice", scope_type="websocket"
+ )
+ receive = AsyncMock(return_value={"type": "websocket.connect"})
+ send = AsyncMock()
+
+ await self.app(scope, receive, send)
+
+ send.assert_any_call({"type": "websocket.send", "text": "lobby:alice"})
+
+
class TestRouting(MicroPieTestCase):
"""Tests for HTTP and WebSocket routing."""