patx/micropie
added file upload (multipart parasing) for post requests
Commit d936bb9 · patx · 2025-01-26T13:12:26-05:00
Comments
No comments yet.
Diff
diff --git a/MicroPie.py b/MicroPie.py
index 59d7e54..1d5664e 100644
--- a/MicroPie.py
+++ b/MicroPie.py
@@ -220,7 +220,6 @@ class Server:
- If your route returns (status, body, extra_headers), we handle them
in a single call to start_response.
- Do NOT call `start_response` in your handler.
-
"""
self.environ = environ
self.start_response = start_response
@@ -290,12 +289,24 @@ class Server:
self.request = method
self.body_params = {}
+ self.files = {} # Initialize files dictionary
if method == "POST":
try:
+ # Determine the content type
+ content_type = environ.get("CONTENT_TYPE", "")
content_length = int(environ.get("CONTENT_LENGTH", 0) or 0)
- body = environ["wsgi.input"].read(content_length).decode("utf-8", "ignore")
- self.body_params = parse_qs(body)
+
+ # Read the request body
+ body = environ['wsgi.input'].read(content_length)
+
+ if 'multipart/form-data' in content_type:
+ # Parse multipart/form-data
+ self.parse_multipart(body, content_type)
+ else:
+ # Handle application/x-www-form-urlencoded
+ body_str = body.decode("utf-8", "ignore")
+ self.body_params = parse_qs(body_str)
except Exception as e:
start_response("400 Bad Request", [("Content-Type", "text/html")])
return [f"400 Bad Request: {str(e)}".encode("utf-8")]
@@ -311,6 +322,10 @@ class Server:
func_args.append(self.query_params[param.name][0])
elif param.name in self.body_params:
func_args.append(self.body_params[param.name][0])
+ elif param.name in self.files:
+ func_args.append(self.files[param.name])
+ elif param.name in self.session:
+ func_args.append(self.session[param.name])
elif param.default is not param.empty:
func_args.append(param.default)
else:
@@ -373,3 +388,83 @@ class Server:
except:
pass
return [b"500 Internal Server Error"]
+
+ def parse_multipart(self, body, content_type):
+ """
+ Custom parser for multipart/form-data.
+
+ Parameters:
+ - body: The raw request body as bytes.
+ - content_type: The Content-Type header value.
+ """
+ # Extract boundary
+ boundary = None
+ parts = content_type.split(";")
+ for part in parts:
+ part = part.strip()
+ if part.startswith("boundary="):
+ boundary = part.split("=", 1)[1]
+ break
+
+ if not boundary:
+ raise ValueError("Boundary not found in Content-Type header.")
+
+ boundary_bytes = boundary.encode("utf-8")
+ # The boundary might be prefixed with '--' in the actual data
+ delimiter = b'--' + boundary_bytes
+ end_delimiter = b'--' + boundary_bytes + b'--'
+
+ # Split the body by the delimiter
+ sections = body.split(delimiter)
+ for section in sections:
+ if not section or section == b'--' or section == b'--\r\n':
+ continue
+ if section.startswith(b'\r\n'):
+ section = section[2:]
+ if section.endswith(b'\r\n'):
+ section = section[:-2]
+ if section == b'--':
+ continue
+
+ # Split headers and content
+ try:
+ headers, content = section.split(b'\r\n\r\n', 1)
+ except ValueError:
+ continue # Invalid section, skip
+
+ headers = headers.decode("utf-8", "ignore").split("\r\n")
+ header_dict = {}
+ for header in headers:
+ if ':' in header:
+ key, value = header.split(':', 1)
+ header_dict[key.strip().lower()] = value.strip()
+
+ # Parse Content-Disposition
+ disposition = header_dict.get("content-disposition", "")
+ disposition_parts = disposition.split(";")
+ disposition_dict = {}
+ for disp_part in disposition_parts:
+ if "=" in disp_part:
+ key, value = disp_part.strip().split("=", 1)
+ disposition_dict[key] = value.strip('"')
+
+ name = disposition_dict.get("name")
+ filename = disposition_dict.get("filename")
+
+ if filename:
+ # It's a file upload
+ file_content_type = header_dict.get("content-type", "application/octet-stream")
+ file_data = content
+ self.files[name] = {
+ 'filename': filename,
+ 'content_type': file_content_type,
+ 'data': file_data
+ }
+ elif name:
+ # It's a regular form field
+ value = content.decode("utf-8", "ignore")
+ if name in self.body_params:
+ self.body_params[name].append(value)
+ else:
+ self.body_params[name] = [value]
+
diff --git a/examples/file_uploads/app.py b/examples/file_uploads/app.py
new file mode 100644
index 0000000..afd4cd6
--- /dev/null
+++ b/examples/file_uploads/app.py
@@ -0,0 +1,58 @@
+from MicroPie import Server
+import os, uuid
+
+class Root(Server):
+
+ def upload_file(self, file):
+ """
+ Handler function to process uploaded files.
+
+ Parameters:
+ - file: A dictionary containing 'filename', 'content_type', and 'data'.
+ """
+ if not file:
+ return 400, "No file uploaded."
+
+ filename = file['filename']
+ content_type = file['content_type']
+ data = file['data']
+
+ # Ensure the 'uploads' directory exists
+ upload_dir = 'uploads'
+ os.makedirs(upload_dir, exist_ok=True)
+
+ # Sanitize the filename to prevent directory traversal attacks
+ filename = os.path.basename(filename)
+
+ # Optionally, generate a unique filename to prevent overwriting
+ unique_filename = f"{uuid.uuid4()}_{filename}"
+ upload_path = os.path.join(upload_dir, unique_filename)
+
+ try:
+ with open(upload_path, 'wb') as f:
+ f.write(data)
+ except IOError as e:
+ return 500, f"Failed to save file: {str(e)}"
+
+ return 200, f"""File '{filename}' uploaded successfully as '{unique_filename}'. <a href="/">Upload another</a>"""
+
+ # Example Index Handler to Render Upload Form
+ def index(self):
+ """
+ Render a simple HTML form for file uploads.
+ """
+ return (
+ "<!DOCTYPE html>"
+ "<html>"
+ "<head><title>Upload File</title></head>"
+ "<body>"
+ "<h1>Upload a File</h1>"
+ "<form action='/upload_file' method='post' enctype='multipart/form-data'>"
+ "<input type='file' name='file' required>"
+ "<button type='submit'>Upload</button>"
+ "</form>"
+ "</body>"
+ "</html>"
+ )
+
+Root().run()