patx/mrhttp-asgi
port to asgi compatibility
Commit d87bc47 · patx · 2026-05-11T22:20:51-04:00
Comments
No comments yet.
Diff
diff --git a/README.md b/README.md
index 4f94688..af5410b 100644
--- a/README.md
+++ b/README.md
@@ -1,72 +1,74 @@
-# Mrhttp
-Async Python 3.5+ web server written in C
+# Mrhttp-ASGI
-# Benchmarks
+ASGI compatible Python 3.5+ web server written in C
-```
- Pipelined
- Hello (cached) 8,534,332 Requests/second
- Hello 6,834,994 Requests/second
- More hdrs 6,193,307 Requests/second
- Sessions 4,396,364 Requests/second
- File Upload 3,510,289 Requests/second
- mrpacker 2,052,674 Requests/second
- Form 1,182,228 Requests/second
-
- One by one
- Hello 707,667 Requests/second
- Hello hdrs 728,639 Requests/second
- Cookies 588,212 Requests/second
- many args 691,910 Requests/second
- 404 natural 763,643 Requests/second
- 404 580,424 Requests/second
- Form parsing 338,553 Requests/second
- mrpacker 533,242 Requests/second
- Sessions 325,354 Requests/second
- File Upload 292,331 Requests/second
- get ip 503,454 Requests/second
-
-```
-Versus sanic a pure python async server
+# Install
+
+Clone the repo:
```
-Hello World 22,366 Requests/second
-Cookies 20,867 Requests/second
-404 8,256 Requests/second
-forms 11,104 Requests/second
-sessions 4,053 Requests/second
-File upload 1,457 Requests/second
+$ git clone https://gitman.io/git/patx/mrhttp-asgi
+$ cd mrhttp-asgi
+$ pip3 install .
```
-Hello World Example
--------------------
-```python
+# Use an existing ASGI app
+Just import `mrhttp` and use `mrhttp.run()`:
+
+```
import mrhttp
+from micropie import App
+
+class Root(App):
+ async def index(self):
+ return {"hello": "world"}
-app = mrhttp.Application()
+app = Root()
+mrhttp.run(app, host="127.0.0.1", port=8080, workers=2, lifespan="on")
+```
[email protected]('/')
-def hello(r):
- return 'Hello World!'
-app.run(cores=2)
+# Benchmarks
-```
+The above app was ran with 8 workers on `uvicorn`, `granian`, and `mrhttp-asi`.
+It was tested with `wrk` using 4 threads and 1000 connections for 15 seconds.
-Installation
-------------
+## Mrhttp-ASGI
```
-sudo apt install python3-dev -y
-pip3 install mrhttp
+ 4 threads and 1000 connections
+ Thread Stats Avg Stdev Max +/- Stdev
+ Latency 2.49ms 2.17ms 209.48ms 88.60%
+ Req/Sec 93.09k 8.76k 119.14k 75.83%
+ 5581724 requests in 15.10s, 777.18MB read
+Requests/sec: 369706.08
+Transfer/sec: 51.48MB
+
```
-Building from source
+## Granian
```
-pip install .
+ 4 threads and 1000 connections
+ Thread Stats Avg Stdev Max +/- Stdev
+ Latency 2.81ms 1.54ms 16.44ms 69.06%
+ Req/Sec 79.32k 11.43k 117.37k 71.83%
+ 4750339 requests in 15.08s, 643.30MB read
+Requests/sec: 314993.15
+Transfer/sec: 42.66MB
```
+## Uvicorn
+
+```
+ 4 threads and 1000 connections
+ Thread Stats Avg Stdev Max +/- Stdev
+ Latency 10.90ms 10.02ms 341.61ms 97.34%
+ Req/Sec 24.03k 1.51k 28.20k 81.67%
+ 1436933 requests in 15.03s, 220.63MB read
+Requests/sec: 95577.79
+Transfer/sec: 14.68MB
+```
diff --git a/setup.py b/setup.py
index 94d9ede..5ffb768 100644
--- a/setup.py
+++ b/setup.py
@@ -39,6 +39,11 @@ setup(
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
ext_modules = [m1],
+ entry_points={
+ 'console_scripts': [
+ 'mrhttp=mrhttp.asgi:main',
+ ],
+ },
package_dir={'':'src'},
packages=find_packages('src'),# + ['prof'],
#package_data={'prof': ['prof.so']},
@@ -65,4 +70,3 @@ setup(
'Topic :: Internet :: WWW/HTTP'
]
)
-
diff --git a/src/mrhttp/__init__.py b/src/mrhttp/__init__.py
index 6949204..143eca5 100644
--- a/src/mrhttp/__init__.py
+++ b/src/mrhttp/__init__.py
@@ -12,14 +12,17 @@ from .memcachedclient import MemcachedClient
from .internals import MrqProtocol
from .internals import MrqClient as CMrqClient
-from .mrqclient import MrqClient
+try:
+ from .mrqclient import MrqClient
+except ImportError:
+ MrqClient = None
from .internals import MrcacheProtocol
from .internals import MrcacheClient as CMrcacheClient
from .mrcacheclient import MrcacheClient
from .app import *
+from .asgi import ASGIConnectionClosed, ASGIProtocolError, ASGIServer, import_from_string, run
from .internals import randint, escape, to64, from64, timesince
__version__=0.13
-
diff --git a/src/mrhttp/__main__.py b/src/mrhttp/__main__.py
new file mode 100644
index 0000000..c98557c
--- /dev/null
+++ b/src/mrhttp/__main__.py
@@ -0,0 +1,5 @@
+from .asgi import main
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/mrhttp/app.py b/src/mrhttp/app.py
index 9ad06de..148c659 100644
--- a/src/mrhttp/app.py
+++ b/src/mrhttp/app.py
@@ -8,7 +8,7 @@ import signal
import asyncio
import traceback
import socket
-import os, sys, random, mrpacker, time
+import os, sys, random, time
from glob import glob
import multiprocessing
import faulthandler
@@ -19,6 +19,11 @@ import inspect, copy
#from prof import profiler_start,profiler_stop
import http.cookies
+try:
+ import mrpacker
+except ImportError:
+ mrpacker = None
+
import mrhttp
from mrhttp import Protocol
from mrhttp.request import Request
@@ -487,4 +492,3 @@ class Application(mrhttp.CApp):
self._session_client = self._mrc
app = Application()
-
diff --git a/src/mrhttp/asgi.py b/src/mrhttp/asgi.py
new file mode 100644
index 0000000..f3120ec
--- /dev/null
+++ b/src/mrhttp/asgi.py
@@ -0,0 +1,466 @@
+import argparse
+import asyncio
+import email.utils
+import importlib
+import multiprocessing
+import os
+import signal
+import socket
+import sys
+from http.client import responses
+
+import uvloop
+
+import mrhttp
+from mrhttp import Protocol
+from mrhttp.internals import _build_asgi_response
+from mrhttp.request import Request
+
+
+asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
+
+
+class ASGIConnectionClosed(OSError):
+ pass
+
+
+class ASGIProtocolError(RuntimeError):
+ pass
+
+
+_EMPTY_HTTP_REQUEST = {"type": "http.request", "body": b"", "more_body": False}
+_HTTP_DISCONNECT = {"type": "http.disconnect"}
+_STATUS_LINE_CACHE = {}
+
+
+def _status_line(version, status):
+ key = (version, status)
+ line = _STATUS_LINE_CACHE.get(key)
+ if line is None:
+ line = "HTTP/{} {} {}\r\n".format(
+ version,
+ status,
+ responses.get(status, ""),
+ ).encode("latin-1")
+ _STATUS_LINE_CACHE[key] = line
+ return line
+
+
+class _ASGIResponder:
+ __slots__ = (
+ "scope",
+ "request_message",
+ "_request_sent",
+ "started",
+ "closed",
+ "status",
+ "headers",
+ "body",
+ "body_parts",
+ "date",
+ "has_content_length",
+ "has_connection",
+ "has_date",
+ "has_server",
+ "http_version",
+ "head_response",
+ "status_line",
+ )
+
+ def __init__(self, scope, body, date):
+ self.scope = scope
+ self.request_message = {"type": "http.request", "body": body or b"", "more_body": False}
+ self._request_sent = False
+ self.started = False
+ self.closed = False
+ self.status = 500
+ self.headers = []
+ self.body = b""
+ self.body_parts = None
+ self.date = date
+ self.has_content_length = False
+ self.has_connection = False
+ self.has_date = False
+ self.has_server = False
+ self.http_version = scope.get("http_version", "1.1")
+ self.head_response = scope.get("method") == "HEAD"
+ self.status_line = _status_line(self.http_version, self.status)
+
+ async def receive(self):
+ if not self._request_sent:
+ self._request_sent = True
+ return self.request_message
+ if self.closed:
+ return _HTTP_DISCONNECT
+ return _EMPTY_HTTP_REQUEST
+
+ async def send(self, message):
+ if self.closed:
+ raise ASGIConnectionClosed("ASGI connection is already closed")
+ if not isinstance(message, dict):
+ raise ASGIProtocolError("ASGI send message must be a dict")
+
+ typ = message.get("type")
+ if typ == "http.response.start":
+ if self.started:
+ raise ASGIProtocolError("http.response.start sent twice")
+ status = message.get("status")
+ if not isinstance(status, int):
+ raise ASGIProtocolError("http.response.start status must be an int")
+ raw_headers = message.get("headers", [])
+ headers = []
+ has_content_length = False
+ has_connection = False
+ has_date = False
+ has_server = False
+
+ for header in raw_headers:
+ name, value = _normalize_header(header)
+ if name.startswith(b":"):
+ raise ASGIProtocolError("HTTP/2 pseudo headers are not valid in HTTP responses")
+ if name == b"transfer-encoding":
+ continue
+ if name == b"content-length":
+ has_content_length = True
+ elif name == b"connection":
+ has_connection = True
+ elif name == b"date":
+ has_date = True
+ elif name == b"server":
+ has_server = True
+ headers.append((name, value))
+
+ self.headers = headers
+ self.has_content_length = has_content_length
+ self.has_connection = has_connection
+ self.has_date = has_date
+ self.has_server = has_server
+ self.status = status
+ self.status_line = _status_line(self.http_version, status)
+ self.started = True
+ return
+
+ if typ == "http.response.body":
+ if not self.started:
+ raise ASGIProtocolError("http.response.body sent before http.response.start")
+ body = message.get("body", b"")
+ if isinstance(body, str):
+ body = body.encode("latin-1")
+ if body:
+ if not isinstance(body, (bytes, bytearray, memoryview)):
+ raise ASGIProtocolError("http.response.body body must be bytes")
+ if not isinstance(body, bytes):
+ body = bytes(body)
+ if self.body_parts is not None:
+ self.body_parts.append(body)
+ elif self.body:
+ self.body_parts = [self.body, body]
+ self.body = b""
+ else:
+ self.body = body
+ if not message.get("more_body", False):
+ self.closed = True
+ return
+
+ raise ASGIProtocolError("Unsupported ASGI send message type: {}".format(typ))
+
+ def to_http(self, keep_alive):
+ if self.body_parts is None:
+ body = self.body
+ else:
+ body = b"".join(self.body_parts)
+ return _build_asgi_response(
+ self.status_line,
+ self.headers,
+ body,
+ self.date,
+ bool(keep_alive),
+ self.head_response,
+ self.has_content_length,
+ self.has_server,
+ self.has_date,
+ self.has_connection,
+ )
+
+
+def _normalize_header(header):
+ if not isinstance(header, (tuple, list)) or len(header) != 2:
+ raise ASGIProtocolError("ASGI headers must be two-item byte pairs")
+ name, value = header
+ if isinstance(name, str):
+ name = name.encode("latin-1")
+ if isinstance(value, str):
+ value = value.encode("latin-1")
+ if not isinstance(name, (bytes, bytearray, memoryview)):
+ raise ASGIProtocolError("ASGI header names must be bytes")
+ if not isinstance(value, (bytes, bytearray, memoryview)):
+ raise ASGIProtocolError("ASGI header values must be bytes")
+ name = bytes(name).lower()
+ value = bytes(value)
+ if b"\r" in name or b"\n" in name or b":" in name:
+ raise ASGIProtocolError("Invalid ASGI response header name")
+ if b"\r" in value or b"\n" in value:
+ raise ASGIProtocolError("Invalid ASGI response header value")
+ return name, value
+
+
+def _error_response(status, message):
+ reason = responses.get(status, "Error")
+ body = (
+ "<html><head><title>{} {}</title></head>"
+ "<body><h1>{}</h1><p>{}</p></body></html>"
+ ).format(status, reason, reason, message).encode("utf-8")
+ return b"".join([
+ "HTTP/1.1 {} {}\r\n".format(status, reason).encode("ascii"),
+ b"server: MrHTTP/0.13\r\n",
+ b"content-type: text/html; charset=utf-8\r\n",
+ b"content-length: " + str(len(body)).encode("ascii") + b"\r\n",
+ b"connection: close\r\n",
+ b"\r\n",
+ body,
+ ])
+
+
+class _LifespanConnection:
+ def __init__(self, app, state, mode):
+ self.app = app
+ self.state = state
+ self.mode = mode
+ self.receive_queue = asyncio.Queue()
+ self.send_queue = asyncio.Queue()
+ self.task = None
+ self.supported = False
+
+ async def receive(self):
+ return await self.receive_queue.get()
+
+ async def send(self, message):
+ await self.send_queue.put(message)
+
+ async def startup(self):
+ if self.mode == "off":
+ return
+
+ scope = {
+ "type": "lifespan",
+ "asgi": {"version": "3.0", "spec_version": "2.0"},
+ "state": self.state,
+ }
+ self.task = asyncio.create_task(self.app(scope, self.receive, self.send))
+ await self.receive_queue.put({"type": "lifespan.startup"})
+
+ try:
+ message = await asyncio.wait_for(self.send_queue.get(), timeout=10)
+ except Exception:
+ if self.mode == "auto" and self.task.done():
+ try:
+ self.task.result()
+ except Exception:
+ return
+ raise
+
+ typ = message.get("type")
+ if typ == "lifespan.startup.complete":
+ self.supported = True
+ return
+ if typ == "lifespan.startup.failed":
+ raise RuntimeError(message.get("message", "ASGI lifespan startup failed"))
+ raise ASGIProtocolError("Unexpected lifespan startup message: {}".format(typ))
+
+ async def shutdown(self):
+ if not self.supported or not self.task:
+ return
+ await self.receive_queue.put({"type": "lifespan.shutdown"})
+ message = await asyncio.wait_for(self.send_queue.get(), timeout=10)
+ typ = message.get("type")
+ if typ == "lifespan.shutdown.failed":
+ raise RuntimeError(message.get("message", "ASGI lifespan shutdown failed"))
+ if typ != "lifespan.shutdown.complete":
+ raise ASGIProtocolError("Unexpected lifespan shutdown message: {}".format(typ))
+ await self.task
+
+
+class ASGIServer(mrhttp.CApp):
+ def __init__(self, asgi_app, *, lifespan="auto", root_path="", scheme="http"):
+ self._loop = None
+ self._connections = set()
+ self._protocol_factory = Protocol
+ self._asgi_app = asgi_app
+ self._asgi_handle = self._handle_asgi
+ self._asgi_root_path = root_path
+ self._asgi_scheme = scheme
+ self._asgi_state = {}
+ self._asgi_spec = {"version": "3.0", "spec_version": "2.5"}
+ self._lifespan_mode = lifespan
+ self._lifespan = None
+ self._date_header_value = email.utils.formatdate(usegmt=True).encode("ascii")
+ self._date_handle = None
+ self.router = None
+ self.requests = None
+ self.session_backend_type = 0
+ self.err404 = "<html><head><title>404 Not Found</title></head><body><h1>Not Found</h1><p>The requested page was not found</p></body></html>"
+ self.err400 = "<html><head><title>400 Bad Request</title></head><body><p>Invalid Request</p></body></html>"
+
+ @property
+ def loop(self):
+ if not self._loop:
+ self._loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self._loop)
+ return self._loop
+
+ def expand_requests(self):
+ for _ in range(len(self.requests)):
+ self.requests.append(Request())
+
+ def _get_idle_and_busy_connections(self):
+ return \
+ [c for c in self._connections if c.pipeline_empty], \
+ [c for c in self._connections if not c.pipeline_empty]
+
+ def _update_date_header(self):
+ self._date_header_value = email.utils.formatdate(usegmt=True).encode("ascii")
+ self._date_handle = self.loop.call_later(1, self._update_date_header)
+
+ async def drain(self):
+ idle, busy = self._get_idle_and_busy_connections()
+ for c in idle:
+ c.transport.close()
+ for _ in range(5):
+ if not busy:
+ return
+ await asyncio.sleep(1)
+ idle, busy = self._get_idle_and_busy_connections()
+ for c in idle:
+ c.transport.close()
+ for c in busy:
+ c.pipeline_cancel()
+
+ async def _handle_asgi(self, scope, body, keep_alive):
+ responder = _ASGIResponder(scope, body, self._date_header_value)
+ try:
+ await self._asgi_app(scope, responder.receive, responder.send)
+ except Exception:
+ if not responder.started:
+ return _error_response(500, "The ASGI application raised before starting a response.")
+ raise
+
+ if not responder.started:
+ return _error_response(500, "The ASGI application completed without starting a response.")
+ return responder.to_http(bool(keep_alive))
+
+ def serve(self, *, sock, host, port, loop, run_async=False):
+ if not loop:
+ loop = self.loop
+ else:
+ self._loop = loop
+ asyncio.set_event_loop(loop)
+
+ self.requests = [Request() for _ in range(128)]
+ self.cinit()
+ self._lifespan = _LifespanConnection(self._asgi_app, self._asgi_state, self._lifespan_mode)
+ self._update_date_header()
+
+ async def start():
+ await self._lifespan.startup()
+ return await loop.create_server(lambda: self._protocol_factory(self), sock=sock)
+
+ if run_async:
+ return start()
+
+ server = loop.run_until_complete(start())
+ print("Accepting ASGI connections on http://{}:{}".format(host, port))
+ loop.add_signal_handler(signal.SIGTERM, loop.stop)
+
+ try:
+ loop.run_forever()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ try:
+ loop.run_until_complete(self._lifespan.shutdown())
+ finally:
+ if self._date_handle is not None:
+ self._date_handle.cancel()
+ self._date_handle = None
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ loop.run_until_complete(self.drain())
+ self._connections.clear()
+ server.close()
+ loop.run_until_complete(server.wait_closed())
+ for r in self.requests:
+ r.cleanup()
+ self.requests = None
+ loop.close()
+
+ def _run(self, *, host, port, num_workers=None):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((host, port))
+ os.set_inheritable(sock.fileno(), True)
+
+ workers = set()
+ terminating = False
+
+ def stop(sig, frame):
+ nonlocal terminating
+ terminating = True
+ for worker in workers:
+ worker.terminate()
+
+ signal.signal(signal.SIGTERM, stop)
+
+ for _ in range(num_workers or 1):
+ worker = multiprocessing.Process(
+ target=self.serve,
+ kwargs=dict(sock=sock, host=host, port=port, loop=None),
+ )
+ worker.daemon = True
+ worker.start()
+ workers.add(worker)
+
+ sock.close()
+ for worker in workers:
+ try:
+ worker.join()
+ except KeyboardInterrupt:
+ if not terminating:
+ worker.terminate()
+
+ def run(self, host="0.0.0.0", port=8080, *, workers=None):
+ self._run(host=host, port=port, num_workers=workers)
+
+
+def import_from_string(value):
+ if ":" not in value:
+ raise ValueError("ASGI app must be specified as module:attribute")
+ module_name, attr_path = value.split(":", 1)
+ module = importlib.import_module(module_name)
+ obj = module
+ for attr in attr_path.split("."):
+ obj = getattr(obj, attr)
+ return obj
+
+
+def run(app, host="0.0.0.0", port=8080, *, workers=None, cores=None, lifespan="auto", root_path="", scheme="http"):
+ if cores is not None and workers is None:
+ workers = cores
+ server = ASGIServer(app, lifespan=lifespan, root_path=root_path, scheme=scheme)
+ return server.run(host=host, port=port, workers=workers)
+
+
+def main(argv=None):
+ parser = argparse.ArgumentParser(prog="mrhttp")
+ parser.add_argument("app", help="ASGI application as module:attribute")
+ parser.add_argument("--host", default="0.0.0.0")
+ parser.add_argument("--port", type=int, default=8080)
+ parser.add_argument("--workers", type=int, default=1)
+ parser.add_argument("--lifespan", choices=("auto", "on", "off"), default="auto")
+ parser.add_argument("--root-path", default="")
+ parser.add_argument("--scheme", default="http")
+ args = parser.parse_args(argv)
+ app = import_from_string(args.app)
+ run(app, host=args.host, port=args.port, workers=args.workers, lifespan=args.lifespan, root_path=args.root_path, scheme=args.scheme)
+
+
+if __name__ == "__main__":
+ main(sys.argv[1:])
diff --git a/src/mrhttp/internals/module.h b/src/mrhttp/internals/module.h
index b4d87ed..9ff50d1 100644
--- a/src/mrhttp/internals/module.h
+++ b/src/mrhttp/internals/module.h
@@ -30,6 +30,7 @@ static PyMethodDef mod_methods[] = {
{"from64", (PyCFunction)from64, METH_O, "Base64 string to number" },
{"timesince", (PyCFunction)timesince, METH_O, "Timestamp difference as string"},
{"hot", (PyCFunction)hot, METH_VARARGS, "Chatt1r's hot algorithm" },
+ {"_build_asgi_response", (PyCFunction)asgi_build_response, METH_VARARGS, "Build a raw ASGI HTTP response"},
{NULL}
};
@@ -597,4 +598,3 @@ static PyTypeObject MrcacheClientType = {
MrcacheClient_new, /* tp_new */
};
-
diff --git a/src/mrhttp/internals/mrhttpparser.c b/src/mrhttp/internals/mrhttpparser.c
index faf888d..d614759 100644
--- a/src/mrhttp/internals/mrhttpparser.c
+++ b/src/mrhttp/internals/mrhttpparser.c
@@ -167,16 +167,23 @@ static const char *parse_request(const char *buf, const char *buf_end, const cha
++buf;
}
- // parse request line
- // TODO Support other methods
+ // parse request line. Keep the hot GET/POST paths, then fall back to
+ // generic token parsing for ASGI frameworks that use HEAD/PUT/PATCH/etc.
switch (*(unsigned int *)buf) {
case CHAR4_TO_INT('G', 'E', 'T', ' '):
*method = buf; *method_len = 3; buf += 4; break;
case CHAR4_TO_INT('P', 'O', 'S', 'T'):
*method = buf; *method_len = 4; buf += 5; break;
default:
- *ret = -1;
- return NULL;
+ *method = buf;
+ int ml = get_len_to_space(buf, buf_end);
+ if ( ml <= 0 ) {
+ *ret = -1;
+ return NULL;
+ }
+ *method_len = ml;
+ buf += ml + 1;
+ break;
}
*path = buf;
int l = get_len_to_space(buf, buf_end);
diff --git a/src/mrhttp/internals/parser.c b/src/mrhttp/internals/parser.c
index 3b9599a..cdebe6d 100644
--- a/src/mrhttp/internals/parser.c
+++ b/src/mrhttp/internals/parser.c
@@ -35,6 +35,103 @@ static long my_strtol( char* s, int maxlen ) {
return l;
}
+static int scan_chunked(char *buf, size_t len, size_t *consumed_len) {
+ char *src = buf;
+ char *end = buf + len;
+
+ while (1) {
+ size_t chunk_len = 0;
+ int saw_digit = 0;
+
+ while (src < end) {
+ char c = *src;
+ if (c >= '0' && c <= '9') {
+ chunk_len = (chunk_len << 4) + (size_t)(c - '0');
+ saw_digit = 1;
+ src++;
+ } else if (c >= 'a' && c <= 'f') {
+ chunk_len = (chunk_len << 4) + (size_t)(c - 'a' + 10);
+ saw_digit = 1;
+ src++;
+ } else if (c >= 'A' && c <= 'F') {
+ chunk_len = (chunk_len << 4) + (size_t)(c - 'A' + 10);
+ saw_digit = 1;
+ src++;
+ } else {
+ break;
+ }
+ }
+
+ if (!saw_digit) return -1;
+
+ while (src < end && *src != '\r') {
+ src++;
+ }
+ if ((end - src) < 2) return -2;
+ if (src[0] != '\r' || src[1] != '\n') return -1;
+ src += 2;
+
+ if (chunk_len == 0) {
+ while (1) {
+ if ((end - src) < 2) return -2;
+ if (src[0] == '\r' && src[1] == '\n') {
+ src += 2;
+ *consumed_len = (size_t)(src - buf);
+ return 1;
+ }
+ while (src < end && *src != '\r') {
+ src++;
+ }
+ if ((end - src) < 2) return -2;
+ if (src[0] != '\r' || src[1] != '\n') return -1;
+ src += 2;
+ }
+ }
+
+ if ((size_t)(end - src) < chunk_len + 2) return -2;
+ src += chunk_len;
+ if (src[0] != '\r' || src[1] != '\n') return -1;
+ src += 2;
+ }
+}
+
+static int decode_chunked_inplace(char *buf, size_t len, size_t *decoded_len, size_t *consumed_len) {
+ int scan_rc = scan_chunked(buf, len, consumed_len);
+ if (scan_rc <= 0) return scan_rc;
+
+ char *src = buf;
+ char *dst = buf;
+ char *end = buf + *consumed_len;
+
+ while (src < end) {
+ size_t chunk_len = 0;
+ while (src < end) {
+ char c = *src;
+ if (c >= '0' && c <= '9') {
+ chunk_len = (chunk_len << 4) + (size_t)(c - '0');
+ } else if (c >= 'a' && c <= 'f') {
+ chunk_len = (chunk_len << 4) + (size_t)(c - 'a' + 10);
+ } else if (c >= 'A' && c <= 'F') {
+ chunk_len = (chunk_len << 4) + (size_t)(c - 'A' + 10);
+ } else {
+ break;
+ }
+ src++;
+ }
+ while (src < end && *src != '\r') src++;
+ src += 2;
+ if (chunk_len == 0) {
+ *decoded_len = (size_t)(dst - buf);
+ return 1;
+ }
+ memmove(dst, src, chunk_len);
+ dst += chunk_len;
+ src += chunk_len + 2;
+ }
+
+ return -1;
+}
+
static void _reset(Parser* self, bool reset_buffer) {
self->body_length = 0;
if ( reset_buffer ) {
@@ -91,6 +188,8 @@ parse_headers:
char *method, *path;
int rc, minor_version;
size_t method_len, path_len;
+ bool chunked = false;
+ size_t consumed_body_length = 0;
request->num_headers = 100; // Max allowed headers
DBG_PARSER printf("before parser requests\n");
@@ -142,6 +241,8 @@ parse_headers:
} else if(name_compare("Connection")) {
if (value_compare("close")) request->keep_alive = false;
+ } else if(name_compare("Transfer-Encoding")) {
+ if (header->value_len == 7 && strncasecmp(header->value, "chunked", 7) == 0) chunked = true;
}
}
@@ -151,8 +252,18 @@ parse_headers:
DBG_PARSER printf("body:\n%.*s\n", (int)(self->end-self->start),self->start);
+ if (chunked) {
+ size_t decoded_len = 0;
+ int chunked_rc = decode_chunked_inplace(self->start, self->end - self->start, &decoded_len, &consumed_body_length);
+ if (chunked_rc == -2) return -2;
+ if (chunked_rc < 0) goto error;
+ self->body_length = decoded_len;
+ } else {
+ consumed_body_length = self->body_length;
+ }
+
// Need more data
- if ( self->body_length > ( self->end - self->start ) ) {
+ if ( consumed_body_length > ( self->end - self->start ) ) {
while ( (self->body_length+(self->end-self->start)) > self->buf_size ) self->buf_size *= 2;
int l = (self->end - self->buf);
self->buf = realloc( self->buf, self->buf_size );
@@ -176,7 +287,7 @@ parse_headers:
if(!Protocol_on_body(self->protocol, self->start, self->body_length)) return -1;
- self->start += self->body_length;
+ self->start += consumed_body_length;
// If we still have data start parsing the next request
if ( self->start < self->end ) {
diff --git a/src/mrhttp/internals/protocol.c b/src/mrhttp/internals/protocol.c
index feb856f..9778b56 100644
--- a/src/mrhttp/internals/protocol.c
+++ b/src/mrhttp/internals/protocol.c
@@ -12,12 +12,16 @@
#include "utils.h"
#include "Python.h"
+#include <ctype.h>
#include <errno.h>
#include <string.h>
+#include <strings.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
+static PyObject* protocol_get_extra_info(Protocol* self, const char* name);
+
// DELME
static void print_buffer( char* b, int len ) {
for ( int z = 0; z < len; z++ ) {
@@ -46,6 +50,13 @@ PyObject * Protocol_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
self->transport = NULL;
self->write = NULL;
self->writelines = NULL;
+ self->asgi_handle = NULL;
+ self->asgi_root_path = NULL;
+ self->asgi_scheme = NULL;
+ self->asgi_state = NULL;
+ self->asgi_spec = NULL;
+ self->asgi_client = NULL;
+ self->asgi_server = NULL;
self->create_task = NULL;
self->task_done = NULL;
@@ -73,6 +84,13 @@ void Protocol_dealloc(Protocol* self)
//Py_XDECREF(self->transport);
Py_XDECREF(self->write);
Py_XDECREF(self->writelines);
+ Py_XDECREF(self->asgi_handle);
+ Py_XDECREF(self->asgi_root_path);
+ Py_XDECREF(self->asgi_scheme);
+ Py_XDECREF(self->asgi_state);
+ Py_XDECREF(self->asgi_spec);
+ Py_XDECREF(self->asgi_client);
+ Py_XDECREF(self->asgi_server);
Py_XDECREF(self->create_task);
Py_XDECREF(self->task_done);
Py_TYPE(self)->tp_free((PyObject*)self);
@@ -89,7 +107,28 @@ int Protocol_init(Protocol* self, PyObject *args, PyObject *kw)
self->request = (Request*)MrhttpApp_get_request( self->app );
//if(!(self->request = (Request*) PyObject_GetAttrString((PyObject*)self->app, "request" ))) return -1;
//if(!(self->response = (Response*)PyObject_GetAttrString((PyObject*)self->app, "response"))) return -1;
- if(!(self->router = (Router*) PyObject_GetAttrString((PyObject*)self->app, "router" ))) return -1;
+ if(!(self->asgi_handle = PyObject_GetAttrString((PyObject*)self->app, "_asgi_handle"))) {
+ PyErr_Clear();
+ self->asgi_handle = NULL;
+ } else if ( self->asgi_handle == Py_None ) {
+ Py_DECREF(self->asgi_handle);
+ self->asgi_handle = NULL;
+ }
+ if ( self->asgi_handle ) {
+ if(!(self->asgi_root_path = PyObject_GetAttrString((PyObject*)self->app, "_asgi_root_path"))) return -1;
+ if(!(self->asgi_scheme = PyObject_GetAttrString((PyObject*)self->app, "_asgi_scheme"))) return -1;
+ if(!(self->asgi_state = PyObject_GetAttrString((PyObject*)self->app, "_asgi_state"))) return -1;
+ if(!(self->asgi_spec = PyObject_GetAttrString((PyObject*)self->app, "_asgi_spec"))) return -1;
+ }
+
+ if(!(self->router = (Router*)PyObject_GetAttrString((PyObject*)self->app, "router"))) {
+ if ( self->asgi_handle ) {
+ PyErr_Clear();
+ self->router = NULL;
+ } else {
+ return -1;
+ }
+ }
if ( !parser_init(&self->parser, self) ) return -1;
@@ -132,6 +171,12 @@ PyObject* Protocol_connection_made(Protocol* self, PyObject* transport)
if(!(self->write = PyObject_GetAttrString(transport, "write"))) return NULL;
if(!(self->writelines = PyObject_GetAttrString(transport, "writelines"))) return NULL;
+ if(self->asgi_handle) {
+ self->asgi_client = protocol_get_extra_info(self, "peername");
+ if(!self->asgi_client) return NULL;
+ self->asgi_server = protocol_get_extra_info(self, "sockname");
+ if(!self->asgi_server) return NULL;
+ }
Py_RETURN_NONE;
}
@@ -426,6 +471,232 @@ void Protocol_on_memcached_reply( SessionCallbackData *scd, char *data, int data
Py_DECREF(self);
}
+static inline bool request_header_name_eq(struct mr_header* header, const char* name) {
+ size_t name_len = strlen(name);
+ return header->name_len == name_len && strncasecmp(header->name, name, name_len) == 0;
+}
+
+static inline bool request_header_value_eq(struct mr_header* header, const char* value) {
+ size_t value_len = strlen(value);
+ return header->value_len == value_len && strncasecmp(header->value, value, value_len) == 0;
+}
+
+static bool request_is_websocket_upgrade(Request* request) {
+ for(struct mr_header* header = request->headers;
+ header < request->headers + request->num_headers;
+ header++) {
+ if(request_header_name_eq(header, "Upgrade") && request_header_value_eq(header, "websocket")) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static PyObject* protocol_get_extra_info(Protocol* self, const char* name) {
+ PyObject* get_extra_info = NULL;
+ PyObject* py_name = NULL;
+ PyObject* result = NULL;
+
+ if(!(get_extra_info = PyObject_GetAttrString(self->transport, "get_extra_info"))) {
+ PyErr_Clear();
+ Py_RETURN_NONE;
+ }
+ if(!(py_name = PyUnicode_FromString(name))) goto error;
+ if(!(result = PyObject_CallFunctionObjArgs(get_extra_info, py_name, NULL))) {
+ PyErr_Clear();
+ Py_INCREF(Py_None);
+ result = Py_None;
+ }
+
+ goto finally;
+
+error:
+ Py_XDECREF(result);
+ result = NULL;
+
+finally:
+ Py_XDECREF(get_extra_info);
+ Py_XDECREF(py_name);
+ return result;
+}
+
+static int dict_set_steal(PyObject* dict, const char* key, PyObject* value) {
+ if(!value) return -1;
+ int rc = PyDict_SetItemString(dict, key, value);
+ Py_DECREF(value);
+ return rc;
+}
+
+static PyObject* protocol_build_asgi_headers(Request* request) {
+ PyObject* headers = PyList_New(request->num_headers);
+ if(!headers) return NULL;
+
+ for(size_t i = 0; i < request->num_headers; i++) {
+ struct mr_header* header = request->headers + i;
+ PyObject* name = NULL;
+ PyObject* value = NULL;
+ PyObject* item = NULL;
+ name = PyBytes_FromStringAndSize(NULL, header->name_len);
+ if(!name) goto loop_error;
+ char* lower_name = PyBytes_AS_STRING(name);
+ for(size_t j = 0; j < header->name_len; j++) {
+ lower_name[j] = (char)tolower((unsigned char)header->name[j]);
+ }
+
+ value = PyBytes_FromStringAndSize(header->value, header->value_len);
+ if(!value) goto loop_error;
+
+ item = PyTuple_New(2);
+ if(!item) goto loop_error;
+ PyTuple_SET_ITEM(item, 0, name);
+ PyTuple_SET_ITEM(item, 1, value);
+ name = NULL;
+ value = NULL;
+ PyList_SET_ITEM(headers, i, item);
+ continue;
+
+loop_error:
+ Py_XDECREF(name);
+ Py_XDECREF(value);
+ Py_XDECREF(item);
+ Py_DECREF(headers);
+ return NULL;
+ }
+
+ return headers;
+}
+
+static PyObject* protocol_build_asgi_scope(Protocol* self, Request* request) {
+ PyObject* scope = NULL;
+ PyObject* headers = NULL;
+ PyObject* state = NULL;
+ static PyObject* type_http = NULL;
+ static PyObject* http10 = NULL;
+ static PyObject* http11 = NULL;
+ static PyObject* empty_bytes = NULL;
+
+ char* query = memchr(request->path, '?', request->path_len);
+ size_t raw_path_len = query ? (size_t)(query - request->path) : request->path_len;
+ size_t query_len = query ? request->path_len - raw_path_len - 1 : 0;
+
+ if(!type_http && !(type_http = PyUnicode_FromString("http"))) goto error;
+ if(!http10 && !(http10 = PyUnicode_FromString("1.0"))) goto error;
+ if(!http11 && !(http11 = PyUnicode_FromString("1.1"))) goto error;
+ if(!empty_bytes && !(empty_bytes = PyBytes_FromStringAndSize("", 0))) goto error;
+
+ scope = PyDict_New();
+ if(!scope) goto error;
+
+ if(PyDict_SetItemString(scope, "type", type_http) == -1) goto error;
+ if(PyDict_SetItemString(scope, "asgi", self->asgi_spec) == -1) goto error;
+ if(PyDict_SetItemString(scope, "http_version", request->minor_version == 0 ? http10 : http11) == -1) goto error;
+ if(dict_set_steal(scope, "method", PyUnicode_FromStringAndSize(request->method, request->method_len)) == -1) goto error;
+ if(dict_set_steal(scope, "raw_path", PyBytes_FromStringAndSize(request->path, raw_path_len)) == -1) goto error;
+ if(query_len) {
+ if(dict_set_steal(scope, "query_string", PyBytes_FromStringAndSize(query + 1, query_len)) == -1) goto error;
+ } else {
+ if(PyDict_SetItemString(scope, "query_string", empty_bytes) == -1) goto error;
+ }
+
+ request_decodePath(request);
+ if(dict_set_steal(scope, "path", PyUnicode_FromStringAndSize(request->path, request->path_len)) == -1) goto error;
+
+ if(PyDict_SetItemString(scope, "root_path", self->asgi_root_path) == -1) goto error;
+ if(PyDict_SetItemString(scope, "scheme", self->asgi_scheme) == -1) goto error;
+
+ headers = protocol_build_asgi_headers(request);
+ if(!headers || PyDict_SetItemString(scope, "headers", headers) == -1) goto error;
+ Py_CLEAR(headers);
+
+ if(PyDict_SetItemString(scope, "client", self->asgi_client ? self->asgi_client : Py_None) == -1) goto error;
+ if(PyDict_SetItemString(scope, "server", self->asgi_server ? self->asgi_server : Py_None) == -1) goto error;
+
+ if(self->asgi_state && PyDict_Check(self->asgi_state) && PyDict_Size(self->asgi_state)) {
+ state = PyDict_Copy(self->asgi_state);
+ } else {
+ state = PyDict_New();
+ }
+ if(!state || PyDict_SetItemString(scope, "state", state) == -1) goto error;
+ Py_CLEAR(state);
+
+ return scope;
+
+error:
+ Py_XDECREF(scope);
+ Py_XDECREF(headers);
+ Py_XDECREF(state);
+ return NULL;
+}
+
+Protocol* Protocol_handle_asgi_request(Protocol* self, Request* request) {
+ PyObject* scope = NULL;
+ PyObject* body = NULL;
+ PyObject* keep_alive = NULL;
+ PyObject* coro = NULL;
+ PyObject* task = NULL;
+ PyObject* ret = NULL;
+ static PyObject* empty_body = NULL;
+
+ if(request_is_websocket_upgrade(request)) {
+ PyObject* bytes = response_getErrorResponse(
+ 501,
+ "Not Implemented",
+ "WebSocket ASGI is not implemented by this mrhttp build."
+ );
+ if(!bytes) return NULL;
+ Protocol* result = protocol_write_raw_response(self, request, bytes);
+ Py_DECREF(bytes);
+ return result;
+ }
+
+ if(self->request == request) {
+ self->request = (Request*)MrhttpApp_get_request(self->app);
+ }
+
+ if(!(scope = protocol_build_asgi_scope(self, request))) goto error;
+ if(request->body_len) {
+ if(!(body = PyBytes_FromStringAndSize(request->body, request->body_len))) goto error;
+ } else {
+ if(!empty_body && !(empty_body = PyBytes_FromStringAndSize("", 0))) goto error;
+ body = empty_body;
+ Py_INCREF(body);
+ }
+ keep_alive = request->keep_alive ? Py_True : Py_False;
+ Py_INCREF(keep_alive);
+
+ if(!(coro = PyObject_CallFunctionObjArgs(self->asgi_handle, scope, body, keep_alive, NULL))) goto error;
+ if(!(task = PyObject_CallFunctionObjArgs(self->create_task, coro, NULL))) goto error;
+
+ if(!(ret = pipeline_queue(self, (PipelineRequest){true, -1, request, task}))) goto error;
+
+ Py_DECREF(ret);
+ Py_DECREF(task);
+ Py_DECREF(coro);
+ Py_DECREF(keep_alive);
+ Py_DECREF(body);
+ Py_DECREF(scope);
+ return self;
+
+error:
+ Py_XDECREF(ret);
+ Py_XDECREF(task);
+ Py_XDECREF(coro);
+ Py_XDECREF(keep_alive);
+ Py_XDECREF(body);
+ Py_XDECREF(scope);
+ PyErr_Print();
+
+ PyObject* bytes = response_getErrorResponse(
+ 500,
+ "Internal Server Error",
+ "The ASGI server failed before the application could produce a response."
+ );
+ if(!bytes) return NULL;
+ Protocol* result = protocol_write_raw_response(self, request, bytes);
+ Py_DECREF(bytes);
+ return result;
+}
+
Protocol* Protocol_on_body(Protocol* self, char* body, size_t body_len) {
DBG printf("protocol - on body\n");
@@ -437,6 +708,9 @@ Protocol* Protocol_on_body(Protocol* self, char* body, size_t body_len) {
request->transport = self->transport;
request->app = (PyObject*)self->app; //TODO need this?
+ if ( self->asgi_handle ) {
+ return Protocol_handle_asgi_request(self, request);
+ }
// URL: / /json
Route *r = router_getRoute( self->router, self->request );
@@ -723,6 +997,22 @@ static inline Protocol* protocol_write_response(Protocol* self, Request *req, Py
return self;
}
+static inline Protocol* protocol_write_raw_response(Protocol* self, Request *req, PyObject *resp) {
+ if ( !resp || !PyBytes_Check(resp) ) return NULL;
+
+ PyObject *o;
+ if(!(o = PyObject_CallFunctionObjArgs(self->write, resp, NULL))) return NULL;
+ Py_DECREF(o);
+
+ if ( !req->keep_alive && !self->closed ) {
+ Protocol_close(self);
+ }
+
+ if ( req != self->request ) MrhttpApp_release_request( self->app, req );
+ else Request_reset(req);
+ return self;
+}
+
static inline Protocol* protocol_write_redirect_response(Protocol* self, int code, char *url ) {
PyObject *bytes = response_getRedirectResponse( code, url );
if ( !bytes ) return NULL;
@@ -827,6 +1117,21 @@ static void* protocol_pipeline_ready(Protocol* self, PipelineRequest r)
}
self->num_requests_popped++;
+ if ( r.mtype == -1 ) {
+ if ( !PyBytes_Check(response) ) {
+ if ( request != self->request ) MrhttpApp_release_request( self->app, request );
+ protocol_write_error_response(self, 500,"Internal Server Error","The ASGI application did not return a raw HTTP response.");
+ return NULL;
+ }
+ if(!self->closed) {
+ if(!protocol_write_raw_response(self, request, response)) goto error;
+ } else {
+ DBG printf("Connection closed, ASGI response dropped prot %p\n",self);
+ if ( request != self->request ) MrhttpApp_release_request( self->app, request );
+ }
+ goto finally;
+ }
+
//if ( PyBytes_Check( response ) ) {
//response = PyUnicode_FromEncodedObject( response, "utf-8", "strict" );
//printf("WARNING: Page handler should return a string. Bytes object returned from the page handler is being converted to unicode using utf-8\n");
@@ -990,7 +1295,3 @@ void Protocol_timeout_request(Protocol* self) {
}
}
-
-
-
-
diff --git a/src/mrhttp/internals/protocol.h b/src/mrhttp/internals/protocol.h
index 75394aa..b1b8fe6 100644
--- a/src/mrhttp/internals/protocol.h
+++ b/src/mrhttp/internals/protocol.h
@@ -28,6 +28,16 @@ struct Protocol {
PyObject* writelines;
double start_time;
+ // ASGI mode. When asgi_handle is set the parser dispatches requests to
+ // app._asgi_handle(scope, body, keep_alive) instead of the mrhttp router.
+ PyObject* asgi_handle;
+ PyObject* asgi_root_path;
+ PyObject* asgi_scheme;
+ PyObject* asgi_state;
+ PyObject* asgi_spec;
+ PyObject* asgi_client;
+ PyObject* asgi_server;
+
// Async Pipeline
PyObject* create_task;
PyObject* task_done;
@@ -74,10 +84,12 @@ Protocol* Protocol_on_body(Protocol* self, char* body, size_t body_len);
Protocol* Protocol_on_error(Protocol* self, PyObject*);
Protocol* Protocol_handle_request(Protocol* self, Request* request, Route* r);
+Protocol* Protocol_handle_asgi_request(Protocol* self, Request* request);
void Protocol_timeout_request(Protocol* self);
static inline Protocol* protocol_write_response(Protocol* self, Request *req, PyObject* resp);
+static inline Protocol* protocol_write_raw_response(Protocol* self, Request *req, PyObject* resp);
static inline Protocol* protocol_write_redirect_response(Protocol* self, int code, char *url);
static inline Protocol* protocol_write_error_response_bytes( Protocol* self, PyObject* bytes );
static inline Protocol* protocol_write_error_response ( Protocol* self, int code, char *reason, char *msg);
@@ -118,4 +130,3 @@ static inline PyObject* pipeline_get_task(PipelineRequest r)
{
return r.task;
}
-
diff --git a/src/mrhttp/internals/response.c b/src/mrhttp/internals/response.c
index 604ae10..27617d6 100644
--- a/src/mrhttp/internals/response.c
+++ b/src/mrhttp/internals/response.c
@@ -247,4 +247,124 @@ PyObject *response_getErrorResponse(int code, char *reason, char *msg) {
return PyBytes_FromStringAndSize( errbuf, p - errbuf + blen );
}
+static inline size_t uint_digits(size_t n) {
+ size_t digits = 1;
+ while (n >= 10) {
+ n /= 10;
+ digits++;
+ }
+ return digits;
+}
+
+static inline char* append_uint(char *p, size_t n) {
+ size_t digits = uint_digits(n);
+ p += digits;
+ char *end = p;
+ do {
+ *--p = (char)('0' + (n % 10));
+ n /= 10;
+ } while (n);
+ return end;
+}
+
+PyObject* asgi_build_response(PyObject* self, PyObject* args) {
+ PyObject *status_line = NULL, *headers = NULL, *body = NULL, *date = NULL;
+ int keep_alive, head_response, has_content_length, has_server, has_date, has_connection;
+
+ if(!PyArg_ParseTuple(
+ args,
+ "OOOOiiiiii",
+ &status_line,
+ &headers,
+ &body,
+ &date,
+ &keep_alive,
+ &head_response,
+ &has_content_length,
+ &has_server,
+ &has_date,
+ &has_connection
+ )) return NULL;
+
+ if(!PyBytes_Check(status_line) || !PyList_Check(headers) || !PyBytes_Check(body) || !PyBytes_Check(date)) {
+ PyErr_SetString(PyExc_TypeError, "invalid ASGI response builder arguments");
+ return NULL;
+ }
+
+ char *status_ptr, *body_ptr, *date_ptr;
+ Py_ssize_t status_len, body_len, date_len;
+ if(PyBytes_AsStringAndSize(status_line, &status_ptr, &status_len) == -1) return NULL;
+ if(PyBytes_AsStringAndSize(body, &body_ptr, &body_len) == -1) return NULL;
+ if(PyBytes_AsStringAndSize(date, &date_ptr, &date_len) == -1) return NULL;
+
+ Py_ssize_t num_headers = PyList_GET_SIZE(headers);
+ size_t total = (size_t)status_len + 2;
+ size_t send_body_len = head_response ? 0 : (size_t)body_len;
+
+ for(Py_ssize_t i = 0; i < num_headers; i++) {
+ PyObject* item = PyList_GET_ITEM(headers, i);
+ if(!PyTuple_Check(item) || PyTuple_GET_SIZE(item) != 2) {
+ PyErr_SetString(PyExc_TypeError, "ASGI response headers must be two-item tuples");
+ return NULL;
+ }
+ PyObject* name = PyTuple_GET_ITEM(item, 0);
+ PyObject* value = PyTuple_GET_ITEM(item, 1);
+ if(!PyBytes_Check(name) || !PyBytes_Check(value)) {
+ PyErr_SetString(PyExc_TypeError, "ASGI response header names and values must be bytes");
+ return NULL;
+ }
+ total += (size_t)PyBytes_GET_SIZE(name) + 2 + (size_t)PyBytes_GET_SIZE(value) + 2;
+ }
+
+ if(!has_content_length) total += strlen("content-length: \r\n") + uint_digits((size_t)body_len);
+ if(!has_server) total += strlen("server: MrHTTP/0.13\r\n");
+ if(!has_date) total += strlen("date: \r\n") + (size_t)date_len;
+ if(!keep_alive && !has_connection) total += strlen("connection: close\r\n");
+ total += send_body_len;
+
+ PyObject* response = PyBytes_FromStringAndSize(NULL, total);
+ if(!response) return NULL;
+
+ char* p = PyBytes_AS_STRING(response);
+ memcpy(p, status_ptr, (size_t)status_len); p += status_len;
+
+ for(Py_ssize_t i = 0; i < num_headers; i++) {
+ PyObject* item = PyList_GET_ITEM(headers, i);
+ PyObject* name = PyTuple_GET_ITEM(item, 0);
+ PyObject* value = PyTuple_GET_ITEM(item, 1);
+ char *name_ptr, *value_ptr;
+ Py_ssize_t name_len, value_len;
+ if(PyBytes_AsStringAndSize(name, &name_ptr, &name_len) == -1) { Py_DECREF(response); return NULL; }
+ if(PyBytes_AsStringAndSize(value, &value_ptr, &value_len) == -1) { Py_DECREF(response); return NULL; }
+ memcpy(p, name_ptr, (size_t)name_len); p += name_len;
+ *p++ = ':'; *p++ = ' ';
+ memcpy(p, value_ptr, (size_t)value_len); p += value_len;
+ *p++ = '\r'; *p++ = '\n';
+ }
+
+ if(!has_content_length) {
+ memcpy(p, "content-length: ", 16); p += 16;
+ p = append_uint(p, (size_t)body_len);
+ *p++ = '\r'; *p++ = '\n';
+ }
+ if(!has_server) {
+ memcpy(p, "server: MrHTTP/0.13\r\n", 21); p += 21;
+ }
+ if(!has_date) {
+ memcpy(p, "date: ", 6); p += 6;
+ memcpy(p, date_ptr, (size_t)date_len); p += date_len;
+ *p++ = '\r'; *p++ = '\n';
+ }
+ if(!keep_alive && !has_connection) {
+ memcpy(p, "connection: close\r\n", 19); p += 19;
+ }
+
+ *p++ = '\r'; *p++ = '\n';
+ if(send_body_len) {
+ memcpy(p, body_ptr, send_body_len); p += send_body_len;
+ }
+
+ return response;
+}
+
diff --git a/src/mrhttp/internals/response.h b/src/mrhttp/internals/response.h
index 7d51a2e..9e0efcb 100644
--- a/src/mrhttp/internals/response.h
+++ b/src/mrhttp/internals/response.h
@@ -52,6 +52,7 @@ int response_add_cookies( Response *self, char *p );
PyObject* response_getRedirectResponse( int code, char *url );
PyObject* response_getErrorResponse( int code, char *reason, char *msg );
+PyObject* asgi_build_response(PyObject* self, PyObject* args);
void response_setupResponseBuffer(void);
char *getResponseBuffer(int sz);
diff --git a/src/mrhttp/request.py b/src/mrhttp/request.py
index 83a33c5..2401d7a 100755
--- a/src/mrhttp/request.py
+++ b/src/mrhttp/request.py
@@ -3,7 +3,10 @@ import cgi, time
import encodings.idna
import collections
import mrhttp
-import mrpacker
+try:
+ import mrpacker
+except ImportError:
+ mrpacker = None
try:
import mrjson as json
except:
@@ -85,6 +88,8 @@ class Request(mrhttp.CRequest):
return self._files
def set_usermrp(self, j):
+ if mrpacker is None:
+ return
try:
self.user = mrpacker.unpack(j)
except:
@@ -102,4 +107,3 @@ class Request(mrhttp.CRequest):
#print("Error parsing json: ", j)
-
diff --git a/tests/s_asgi.py b/tests/s_asgi.py
new file mode 100644
index 0000000..271c7b2
--- /dev/null
+++ b/tests/s_asgi.py
@@ -0,0 +1,55 @@
+import mrhttp
+
+
+startup_seen = False
+shutdown_seen = False
+
+
+async def app(scope, receive, send):
+ global startup_seen, shutdown_seen
+
+ if scope["type"] == "lifespan":
+ while True:
+ message = await receive()
+ if message["type"] == "lifespan.startup":
+ startup_seen = True
+ await send({"type": "lifespan.startup.complete"})
+ elif message["type"] == "lifespan.shutdown":
+ shutdown_seen = True
+ await send({"type": "lifespan.shutdown.complete"})
+ return
+
+ assert scope["type"] == "http"
+ message = await receive()
+ body = message.get("body", b"")
+
+ if scope["path"] == "/":
+ content = b"hello asgi"
+ elif scope["path"] == "/echo":
+ content = body
+ elif scope["path"] == "/scope":
+ headers = scope["headers"]
+ dup_count = len([h for h in headers if h[0] == b"x-dup"])
+ content = "|".join([
+ scope["method"],
+ scope["path"],
+ scope["raw_path"].decode("ascii"),
+ scope["query_string"].decode("ascii"),
+ str(dup_count),
+ str(startup_seen),
+ ]).encode("utf-8")
+ else:
+ await send({"type": "http.response.start", "status": 404, "headers": []})
+ await send({"type": "http.response.body", "body": b"missing"})
+ return
+
+ await send({
+ "type": "http.response.start",
+ "status": 200,
+ "headers": [(b"content-type", b"text/plain")],
+ })
+ await send({"type": "http.response.body", "body": content[:3], "more_body": True})
+ await send({"type": "http.response.body", "body": content[3:]})
+
+
+mrhttp.run(app, host="127.0.0.1", port=8080, workers=1, lifespan="on")
diff --git a/tests/s_fastapi.py b/tests/s_fastapi.py
new file mode 100644
index 0000000..5e2ce45
--- /dev/null
+++ b/tests/s_fastapi.py
@@ -0,0 +1,9 @@
+import mrhttp
+from micropie import App
+
+class Root(App):
+ async def index(self):
+ return {"ok": True}
+
+app = Root()
+mrhttp.run(app, host="127.0.0.1", port=8080, workers=8, lifespan="off")
diff --git a/tests/s_fastapi_smoke.py b/tests/s_fastapi_smoke.py
new file mode 100644
index 0000000..f40fd87
--- /dev/null
+++ b/tests/s_fastapi_smoke.py
@@ -0,0 +1,20 @@
+import mrhttp
+from fastapi import FastAPI, Request
+from fastapi.responses import PlainTextResponse
+
+
+app = FastAPI()
+
+
[email protected]("/")
+async def index():
+ return {"ok": True}
+
+
[email protected]("/echo")
+async def echo(request: Request):
+ body = await request.body()
+ return PlainTextResponse(body)
+
+
+mrhttp.run(app, host="127.0.0.1", port=8080, workers=1, lifespan="on")
diff --git a/tests/test_asgi.py b/tests/test_asgi.py
new file mode 100644
index 0000000..5a570eb
--- /dev/null
+++ b/tests/test_asgi.py
@@ -0,0 +1,84 @@
+import socket
+
+import requests
+
+import tests.common
+from tests.common import eq, stop_server
+
+
+server = None
+
+
+def setup():
+ global server
+ server = tests.common.start_server("tests/s_asgi.py")
+ if not server:
+ return 1
+ return 0
+
+
+def setup_module(module):
+ assert setup() == 0
+
+
+def test_asgi_http():
+ r = requests.get("http://localhost:8080/")
+ eq(r.status_code, 200)
+ eq(r.text, "hello asgi")
+ eq(r.headers["content-type"], "text/plain")
+
+ r = requests.post("http://localhost:8080/echo", data=b"posted")
+ eq(r.status_code, 200)
+ eq(r.content, b"posted")
+
+ sock = socket.create_connection(("127.0.0.1", 8080), timeout=3)
+ try:
+ sock.sendall(
+ b"GET /scope?x=1 HTTP/1.1\r\n"
+ b"Host: localhost\r\n"
+ b"X-Dup: one\r\n"
+ b"X-Dup: two\r\n"
+ b"\r\n"
+ )
+ data = sock.recv(4096)
+ assert b"\r\n\r\nGET|/scope|/scope|x=1|2|True" in data, data
+ finally:
+ sock.close()
+
+
+def test_chunked_request_body():
+ sock = socket.create_connection(("127.0.0.1", 8080), timeout=3)
+ try:
+ sock.sendall(
+ b"POST /echo HTTP/1.1\r\n"
+ b"Host: localhost\r\n"
+ b"Transfer-Encoding: chunked\r\n"
+ b"\r\n"
+ b"4\r\nWiki\r\n"
+ b"5\r\npedia\r\n"
+ b"0\r\n\r\n"
+ )
+ data = sock.recv(4096)
+ assert b"\r\n\r\nWikipedia" in data, data
+ finally:
+ sock.close()
+
+
+def teardown():
+ global server
+ proc = server
+ server = None
+ stop_server(proc)
+ if proc is not None:
+ try:
+ proc.wait(timeout=2)
+ except Exception:
+ pass
+ if proc.stdout:
+ proc.stdout.close()
+ if proc.stderr:
+ proc.stderr.close()
+
+
+def teardown_module(module):
+ teardown()
diff --git a/tests/test_asgi_frameworks.py b/tests/test_asgi_frameworks.py
new file mode 100644
index 0000000..4046c95
--- /dev/null
+++ b/tests/test_asgi_frameworks.py
@@ -0,0 +1,55 @@
+import importlib.util
+
+import pytest
+import requests
+
+import tests.common
+from tests.common import eq, stop_server
+
+
+server = None
+
+
+def setup():
+ if importlib.util.find_spec("fastapi") is None:
+ pytest.skip("fastapi is not installed")
+
+ global server
+ server = tests.common.start_server("tests/s_fastapi_smoke.py")
+ if not server:
+ return 1
+ return 0
+
+
+def setup_module(module):
+ assert setup() == 0
+
+
+def test_fastapi_http():
+ r = requests.get("http://localhost:8080/")
+ eq(r.status_code, 200)
+ eq(r.json(), {"ok": True})
+
+ r = requests.post("http://localhost:8080/echo", data=b"fastapi")
+ eq(r.status_code, 200)
+ eq(r.text, "fastapi")
+
+
+def teardown():
+ global server
+ proc = server
+ server = None
+ stop_server(proc)
+ if proc is not None:
+ try:
+ proc.wait(timeout=2)
+ except Exception:
+ pass
+ if proc.stdout:
+ proc.stdout.close()
+ if proc.stderr:
+ proc.stderr.close()
+
+
+def teardown_module(module):
+ teardown()