patx/micropie
added heroku ready webrtc example
Commit d943cd3 · patx · 2025-12-29T00:47:38-05:00
Comments
No comments yet.
Diff
diff --git a/examples/socketio/webtrc/advanced b/examples/socketio/webtrc/advanced
deleted file mode 160000
index f5b5c7e..0000000
--- a/examples/socketio/webtrc/advanced
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit f5b5c7eb4d6cc189606815cd84d798e0b2c4df6a
diff --git a/examples/socketio/webtrc/heroku_ready/Procfile b/examples/socketio/webtrc/heroku_ready/Procfile
new file mode 100644
index 0000000..c76b7d3
--- /dev/null
+++ b/examples/socketio/webtrc/heroku_ready/Procfile
@@ -0,0 +1,2 @@
+web: uvicorn app:app --host=0.0.0.0 --port=${PORT} --workers 4
+
diff --git a/examples/socketio/webtrc/heroku_ready/app.py b/examples/socketio/webtrc/heroku_ready/app.py
new file mode 100644
index 0000000..5108683
--- /dev/null
+++ b/examples/socketio/webtrc/heroku_ready/app.py
@@ -0,0 +1,258 @@
+import os
+import time
+import socketio
+from micropie import App
+from mongokv import Mkv
+
+# ---------------- Config ----------------
+ALLOWED_ORIGINS = [
+ "http://localhost:8000",
+ "http://127.0.0.1:8000", # add your domain here
+]
+
+MONGO_URI = "mongodb://localhost:27017"
+presence = Mkv(MONGO_URI, db_name="presence", collection_name="presence")
+
+# Presence TTL: streamer is considered offline if no heartbeat within this window
+STREAMER_TTL_SECONDS = 45
+
+sio = socketio.AsyncServer(
+ async_mode="asgi",
+ cors_allowed_origins=ALLOWED_ORIGINS,
+ ping_interval=25,
+ ping_timeout=60,
+)
+
+
+class MyApp(App):
+ async def index(self):
+ return "Use /stream/<username> or /watch/<username>"
+
+ async def stream(self, username: str):
+ return await self._render_template("stream.html", username=username)
+
+ async def watch(self, username: str):
+ return await self._render_template("watch.html", username=username)
+
+
+# ---------------- Presence helpers (mongoKV) ----------------
+def _k_sid(username: str) -> str:
+ return f"streamer_sid:{username}"
+
+def _k_seen(username: str) -> str:
+ return f"streamer_seen:{username}"
+
+def _k_user_by_sid(sid: str) -> str:
+ return f"streamer_user_by_sid:{sid}"
+
+def _k_token(username: str) -> str:
+ return f"streamer_token:{username}"
+
+
+async def _now() -> int:
+ return int(time.time())
+
+
+async def _get_streamer_record(username: str) -> tuple[str | None, int | None]:
+ try:
+ sid = await presence.get(_k_sid(username))
+ seen = await presence.get(_k_seen(username))
+ return sid, seen
+ except KeyError:
+ return None, None
+
+
+async def get_streamer_sid_if_fresh(username: str) -> str | None:
+ sid, seen = await _get_streamer_record(username)
+ if not sid or not isinstance(seen, int):
+ return None
+ if (await _now()) - seen > STREAMER_TTL_SECONDS:
+ return None
+ return sid
+
+
+async def clear_streamer(username: str):
+ for key in (_k_sid(username), _k_seen(username), _k_token(username)):
+ try:
+ await presence.delete(key)
+ except Exception:
+ pass
+
+
+async def streamer_still_owner(username: str, sid: str) -> bool:
+ current = await get_streamer_sid_if_fresh(username)
+ return current == sid
+
+
+async def claim_stream_username(username: str, sid: str, stream_token: str | None) -> tuple[bool, str | None]:
+ """
+ Returns (ok, reason).
+ - If a fresh streamer exists and token doesn't match -> deny "taken"
+ - If token matches (same browser rejoin), allow takeover
+ - If no fresh streamer exists, claim it
+ """
+ existing_sid = await get_streamer_sid_if_fresh(username)
+
+ stored_token = None
+ try:
+ stored_token = await presence.get(_k_token(username))
+ except KeyError:
+ stored_token = None
+
+ # Someone is live and it's not us: only allow if token matches
+ if existing_sid and existing_sid != sid:
+ if not stored_token or not stream_token or stored_token != stream_token:
+ return False, "taken"
+
+ now = await _now()
+ await presence.set(_k_sid(username), sid)
+ await presence.set(_k_seen(username), now)
+ await presence.set(_k_user_by_sid(sid), username)
+
+ # Bind token on first claim
+ if stream_token and (stored_token is None):
+ await presence.set(_k_token(username), stream_token)
+
+ return True, None
+
+
+# ---------------- Socket.IO Events ----------------
[email protected]
+async def connect(sid, environ):
+ print(f"[connect] {sid}")
+
+
[email protected]
+async def disconnect(sid):
+ print(f"[disconnect] {sid}")
+ # cleanup if this sid was a streamer
+ try:
+ username = await presence.get(_k_user_by_sid(sid))
+ except KeyError:
+ return
+
+ if await streamer_still_owner(username, sid):
+ await clear_streamer(username)
+
+ try:
+ await presence.delete(_k_user_by_sid(sid))
+ except Exception:
+ pass
+
+
[email protected]("join_room")
+async def join_room(sid, data):
+ username = (data or {}).get("username")
+ role = (data or {}).get("role") # "streamer" or "watcher"
+ stream_token = (data or {}).get("streamToken")
+ if not username:
+ return
+
+ await sio.enter_room(sid, username)
+
+ if role == "streamer":
+ ok, reason = await claim_stream_username(username, sid, stream_token)
+ if not ok:
+ await sio.emit("stream_denied", {"username": username, "reason": reason}, to=sid)
+ await sio.leave_room(sid, username)
+ await sio.disconnect(sid)
+ print(f"[join_room] DENIED streamer '{username}' to {sid} reason={reason}")
+ return
+
+ await sio.emit("stream_accepted", {"username": username}, to=sid)
+ print(f"[join_room] STREAMER {sid} claimed '{username}'")
+
+ else:
+ print(f"[join_room] WATCHER {sid} joined '{username}'")
+
+
[email protected]("streamer_heartbeat")
+async def streamer_heartbeat(sid, data):
+ username = (data or {}).get("username")
+ if not username:
+ return
+ if await streamer_still_owner(username, sid):
+ await presence.set(_k_seen(username), await _now())
+
+
[email protected]("new_watcher")
+async def new_watcher(sid, data):
+ print(f"[new_watcher] raw sid={sid} data={data}")
+ username = (data or {}).get("username")
+ if not username:
+ return
+
+ streamer_sid = await get_streamer_sid_if_fresh(username)
+ print(f"[new_watcher] watcher {sid} wants '{username}', streamer={streamer_sid}")
+
+ if streamer_sid:
+ await sio.emit("new_watcher", {"watcherSid": sid}, to=streamer_sid)
+ else:
+ await sio.emit("stream_offline", {"username": username}, to=sid)
+
+
[email protected]("offer")
+async def handle_offer(sid, data):
+ username = (data or {}).get("username")
+ watcher_sid = (data or {}).get("watcherSid")
+ offer_sdp = (data or {}).get("offer")
+ offer_type = (data or {}).get("offerType", "offer")
+
+ # Only current owner can send offers for this username
+ if not username or not await streamer_still_owner(username, sid):
+ await sio.emit("stream_denied", {"username": username, "reason": "not_owner"}, to=sid)
+ return
+
+ if watcher_sid:
+ await sio.emit(
+ "offer",
+ {"offer": offer_sdp, "offerType": offer_type, "streamerSid": sid},
+ to=watcher_sid,
+ )
+
+
[email protected]("answer")
+async def handle_answer(sid, data):
+ streamer_sid = (data or {}).get("streamerSid")
+ answer_sdp = (data or {}).get("answer")
+ answer_type = (data or {}).get("answerType", "answer")
+
+ if streamer_sid:
+ await sio.emit(
+ "answer",
+ {"answer": answer_sdp, "answerType": answer_type, "watcherSid": sid},
+ to=streamer_sid,
+ )
+
+
[email protected]("ice-candidate")
+async def handle_ice_candidate(sid, data):
+ target_sid = (data or {}).get("targetSid")
+ candidate = (data or {}).get("candidate")
+ sdp_mid = (data or {}).get("sdpMid")
+ sdp_mline_index = (data or {}).get("sdpMLineIndex")
+
+ # Optional hardening: streamer ownership check
+ username = (data or {}).get("username")
+ role = (data or {}).get("role")
+ if role == "streamer" and username:
+ if not await streamer_still_owner(username, sid):
+ await sio.emit("stream_denied", {"username": username, "reason": "not_owner"}, to=sid)
+ return
+
+ if target_sid:
+ await sio.emit(
+ "ice-candidate",
+ {
+ "candidate": candidate,
+ "sdpMid": sdp_mid,
+ "sdpMLineIndex": sdp_mline_index,
+ "senderSid": sid,
+ },
+ to=target_sid,
+ )
+
+
+asgi_app = MyApp()
+app = socketio.ASGIApp(sio, other_asgi_app=asgi_app)
+
diff --git a/examples/socketio/webtrc/heroku_ready/requirements.txt b/examples/socketio/webtrc/heroku_ready/requirements.txt
new file mode 100644
index 0000000..6481c15
--- /dev/null
+++ b/examples/socketio/webtrc/heroku_ready/requirements.txt
@@ -0,0 +1,4 @@
+micropie[all]
+python-socketio
+mongokv
+
diff --git a/examples/socketio/webtrc/heroku_ready/templates/stream.html b/examples/socketio/webtrc/heroku_ready/templates/stream.html
new file mode 100644
index 0000000..eb520d8
--- /dev/null
+++ b/examples/socketio/webtrc/heroku_ready/templates/stream.html
@@ -0,0 +1,166 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Your Stream</title>
+ <meta charset="UTF-8"/>
+ <meta name="viewport" content="width=device-width,initial-scale=1.0"/>
+ <script src="https://cdn.socket.io/4.5.4/socket.io.min.js"></script>
+ <style>
+ body { font-family: system-ui, -apple-system, Segoe UI, Roboto, sans-serif; padding: 16px; }
+ #status { opacity: .8; margin: 8px 0 16px; }
+ video { width: min(720px, 100%); border-radius: 12px; background: #111; }
+ </style>
+</head>
+<body>
+ <h1>Streaming as {{ username }}</h1>
+ <div id="status">Connecting…</div>
+
+ <video id="localVideo" autoplay playsinline muted style="transform: scaleX(-1);"></video>
+
+ <script>
+ const username = "{{ username }}";
+
+ // Token allows same browser to refresh and reclaim the username without waiting TTL
+ const tokenKey = `stream_token:${username}`;
+ let streamToken = localStorage.getItem(tokenKey);
+ if (!streamToken) {
+ streamToken = crypto.randomUUID();
+ localStorage.setItem(tokenKey, streamToken);
+ }
+
+ const socket = io({ transports: ["websocket"] });
+
+ const statusEl = document.getElementById("status");
+ const peerConnections = {};
+
+ let localStream = null;
+ let heartbeatTimer = null;
+ let accepted = false;
+
+ socket.on("connect", () => {
+ console.log("[Streamer] Connected:", socket.id);
+ statusEl.textContent = "Connected. Claiming username…";
+ socket.emit("join_room", { username, role: "streamer", streamToken });
+ });
+
+ socket.on("disconnect", () => {
+ statusEl.textContent = "Disconnected.";
+ accepted = false;
+ if (heartbeatTimer) clearInterval(heartbeatTimer);
+ heartbeatTimer = null;
+ });
+
+ socket.on("stream_accepted", async () => {
+ accepted = true;
+ statusEl.textContent = "Live ✅ Starting camera…";
+ await startLocalCamera();
+
+ heartbeatTimer = setInterval(() => {
+ socket.emit("streamer_heartbeat", { username });
+ }, 20000);
+
+ statusEl.textContent = "Live ✅ Waiting for viewers…";
+ });
+
+ socket.on("stream_denied", (msg) => {
+ const reason = msg?.reason || "denied";
+ console.warn("[Streamer] stream_denied", msg);
+
+ if (reason === "taken") {
+ statusEl.textContent = `Username "${username}" is already live.`;
+ alert(`Can't stream as "${username}" — already taken.`);
+ } else {
+ statusEl.textContent = `Streaming denied: ${reason}`;
+ alert(`Streaming denied: ${reason}`);
+ }
+ cleanupLocal();
+ });
+
+ async function startLocalCamera() {
+ if (localStream) return;
+ try {
+ localStream = await navigator.mediaDevices.getUserMedia({ video: true, audio: true });
+ document.getElementById("localVideo").srcObject = localStream;
+ } catch (err) {
+ console.error("Error accessing camera:", err);
+ statusEl.textContent = "Camera/mic permission error.";
+ }
+ }
+
+ function cleanupLocal() {
+ try {
+ if (localStream) localStream.getTracks().forEach(t => t.stop());
+ } catch (e) {}
+ localStream = null;
+
+ Object.values(peerConnections).forEach(pc => { try { pc.close(); } catch (e) {} });
+ for (const k of Object.keys(peerConnections)) delete peerConnections[k];
+
+ if (heartbeatTimer) clearInterval(heartbeatTimer);
+ heartbeatTimer = null;
+ }
+
+ socket.on("new_watcher", (data) => {
+ if (!accepted) return;
+ if (!localStream) return;
+
+ const watcherSid = data.watcherSid;
+ console.log("[Streamer] new_watcher:", watcherSid);
+ createOfferForWatcher(watcherSid);
+ });
+
+ async function createOfferForWatcher(watcherSid) {
+ const pc = new RTCPeerConnection({
+ iceServers: [{ urls: "stun:stun.l.google.com:19302" }]
+ });
+ peerConnections[watcherSid] = pc;
+
+ localStream.getTracks().forEach((track) => pc.addTrack(track, localStream));
+
+ pc.onicecandidate = (event) => {
+ if (event.candidate) {
+ socket.emit("ice-candidate", {
+ username,
+ role: "streamer",
+ candidate: event.candidate.candidate,
+ sdpMid: event.candidate.sdpMid,
+ sdpMLineIndex: event.candidate.sdpMLineIndex,
+ targetSid: watcherSid
+ });
+ }
+ };
+
+ const offer = await pc.createOffer();
+ await pc.setLocalDescription(offer);
+
+ socket.emit("offer", {
+ username,
+ offer: offer.sdp,
+ offerType: offer.type,
+ watcherSid
+ });
+ }
+
+ socket.on("answer", async (data) => {
+ const { answer, answerType, watcherSid } = data;
+ const pc = peerConnections[watcherSid];
+ if (!pc) return;
+
+ await pc.setRemoteDescription(new RTCSessionDescription({
+ type: answerType,
+ sdp: answer
+ }));
+ });
+
+ socket.on("ice-candidate", (data) => {
+ const { candidate, sdpMid, sdpMLineIndex, senderSid } = data;
+ const pc = peerConnections[senderSid];
+ if (!pc || !candidate) return;
+
+ pc.addIceCandidate(new RTCIceCandidate({ candidate, sdpMid, sdpMLineIndex }))
+ .catch(err => console.error("Error adding ICE candidate:", err));
+ });
+ </script>
+</body>
+</html>
+
diff --git a/examples/socketio/webtrc/heroku_ready/templates/watch.html b/examples/socketio/webtrc/heroku_ready/templates/watch.html
new file mode 100644
index 0000000..38cb71b
--- /dev/null
+++ b/examples/socketio/webtrc/heroku_ready/templates/watch.html
@@ -0,0 +1,137 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Watching {{ username }}</title>
+ <meta charset="UTF-8"/>
+ <meta name="viewport" content="width=device-width,initial-scale=1.0"/>
+ <script src="https://cdn.socket.io/4.5.4/socket.io.min.js"></script>
+ <style>
+ body { font-family: system-ui, -apple-system, Segoe UI, Roboto, sans-serif; padding: 16px; }
+ #status { opacity: .8; margin: 8px 0 16px; }
+ video { width: min(900px, 100%); border-radius: 12px; background: #111; }
+ .hint { font-size: 14px; opacity: .75; margin-top: 10px; }
+ </style>
+</head>
+<body>
+ <h1>Watching {{ username }}</h1>
+ <div id="status">Connecting…</div>
+
+ <video
+ id="remoteVideo"
+ playsinline
+ autoplay
+ controls
+ style="transform: scaleX(-1);">
+ </video>
+
+ <div class="hint">
+ If you don’t hear audio, click the video and press play (browser autoplay rules).
+ </div>
+
+ <script>
+ const username = "{{ username }}";
+ const socket = io({ transports: ["websocket"] });
+
+ let peerConnection = null;
+ let currentStreamerSid = null;
+
+ const statusEl = document.getElementById("status");
+ const remoteVideo = document.getElementById("remoteVideo");
+
+ function cleanupPeer() {
+ try { if (peerConnection) peerConnection.close(); } catch (e) {}
+ peerConnection = null;
+ currentStreamerSid = null;
+ try { remoteVideo.srcObject = null; } catch (e) {}
+ }
+
+ socket.on("connect", () => {
+ console.log("[Watcher] Connected:", socket.id);
+ statusEl.textContent = "Connected. Looking for stream…";
+
+ socket.emit("join_room", { username, role: "watcher" });
+
+ setTimeout(() => {
+ console.log("[Watcher] emitting new_watcher");
+ socket.emit("new_watcher", { username });
+ }, 250);
+ });
+
+ socket.on("disconnect", () => {
+ statusEl.textContent = "Disconnected.";
+ cleanupPeer();
+ });
+
+ socket.on("stream_offline", () => {
+ statusEl.textContent = `@${username} is offline.`;
+ cleanupPeer();
+ });
+
+ socket.on("offer", async (data) => {
+ console.log("[Watcher] Offer from:", data.streamerSid);
+ statusEl.textContent = "Receiving stream…";
+ currentStreamerSid = data.streamerSid;
+
+ if (!peerConnection) {
+ peerConnection = new RTCPeerConnection({
+ iceServers: [{ urls: "stun:stun.l.google.com:19302" }]
+ });
+
+ peerConnection.onicecandidate = (event) => {
+ if (event.candidate && currentStreamerSid) {
+ socket.emit("ice-candidate", {
+ candidate: event.candidate.candidate,
+ sdpMid: event.candidate.sdpMid,
+ sdpMLineIndex: event.candidate.sdpMLineIndex,
+ targetSid: currentStreamerSid
+ });
+ }
+ };
+
+ peerConnection.ontrack = (event) => {
+ const stream = event.streams && event.streams[0];
+ if (!stream) return;
+
+ remoteVideo.srcObject = stream;
+
+ remoteVideo.play().then(() => {
+ statusEl.textContent = "Live ✅";
+ }).catch(() => {
+ statusEl.textContent = "Live (press play)";
+ });
+ };
+ }
+
+ try {
+ await peerConnection.setRemoteDescription(new RTCSessionDescription({
+ type: data.offerType,
+ sdp: data.offer
+ }));
+
+ const answer = await peerConnection.createAnswer();
+ await peerConnection.setLocalDescription(answer);
+
+ socket.emit("answer", {
+ answer: answer.sdp,
+ answerType: answer.type,
+ streamerSid: data.streamerSid
+ });
+ } catch (err) {
+ console.error("[Watcher] offer/answer error:", err);
+ statusEl.textContent = "Error connecting to stream.";
+ cleanupPeer();
+ }
+ });
+
+ socket.on("ice-candidate", (data) => {
+ const { candidate, sdpMid, sdpMLineIndex, senderSid } = data;
+ if (!peerConnection || !candidate) return;
+ if (currentStreamerSid && senderSid !== currentStreamerSid) return;
+
+ peerConnection.addIceCandidate(new RTCIceCandidate({ candidate, sdpMid, sdpMLineIndex }))
+ .catch(err => console.error("[Watcher] addIceCandidate error:", err));
+ });
+ </script>
+</body>
+</html>
+