patx/mrhttp-asgi

Add mrq2 cluster

Commit dce7912 · Mark Reed · 2024-03-16T21:50:19-07:00

Changeset
dce79123062c3b12653147c4d6c5ebd9f06630d7
Parents
2fadf3d174c366b6ad5cb73f671dd3efcf9d3e62

View source at this commit

Comments

No comments yet.

Log in to comment

Diff

diff --git a/examples/9_mrworkserver.py b/examples/9_mrworkserver.py
index ba6a9b6..af65bbd 100755
--- a/examples/9_mrworkserver.py
+++ b/examples/9_mrworkserver.py
@@ -1,4 +1,6 @@
 
+# TODO Not sure we support json here at the moment.  Need to change to mrpacker or support both
+#
 # What is this?
 #
 # Json is posted to mrhttp and forwarded to a mrworkserver cluster using C code.  This is much faster than doing it using python
@@ -11,6 +13,10 @@
 # wait.  By gathering messages in mrworkserver I'm able to process many related messages 
 # together to speed up msg handling
 #
+# Note - /sq/ will use the user id in the session key to decide which server in the cluster gets the message.
+#        /q/{} will use the path arg to choose the cluster server - so /q/{thread_id} with 2 servers 
+#          would have the even ids go to server 0 and odd to server 1
+#
 # Setup the memcached and mrq servers first:
 #
 # memcached -l 127.0.0.1 -p 11211 -d -m 50
@@ -20,6 +26,7 @@
 # python tst.py 7101
 #
 # curl -i --raw http://localhost:8080/q/0/0/ -X POST -d '{"username":"xyz"}'
+# curl -H "Content-Type: application/mrpacker" --data-binary @tests/lua/test.mrp http://localhost:8080/q2/0
 #
 # To fetch a user's session and pass it to mrworkserver:
 #
@@ -35,7 +42,10 @@ import mrjson as json
 
 app = mrhttp.Application()
 app.config["memcache"] = [ ("127.0.0.1", 11211) ]
-app.config["mrq"] = [("127.0.0.1",7100),("127.0.0.1",7101)]
+
+# We setup 2 clusters of 1 server each
+app.config["mrq"] = [("127.0.0.1",7100)]
+app.config["mrq2"] = [("127.0.0.1",7101)]
 
 @app.route('/q/{}/{}/',options=['mrq'])
 def queue(r, s, t):
@@ -58,6 +68,13 @@ def session_queue(r):
   if r.servers_down:
     return "Servers not available, try again later"
   return 'Hello World!'
+
[email protected]('/q2/{}',options=['mrq2'])
+def queue2(r, some_id):
+  if r.servers_down:
+    return "Servers not available, try again later"
+  return 'Hello World!'
+   
    
 app.run(cores=1)
 
diff --git a/src/mrhttp/app.py b/src/mrhttp/app.py
index f10d33c..0a11ce9 100644
--- a/src/mrhttp/app.py
+++ b/src/mrhttp/app.py
@@ -68,10 +68,12 @@ class Application(mrhttp.CApp):
     self.listeners = { "at_start":[], "at_end":[], "after_start":[]}
     self._mc = None
     self._mrq = None
+    self._mrq2 = None
     self._mrc = None
     self.session_backend = "memcached"
     self.uses_session = False
     self.uses_mrq = False
+    self.uses_mrq2 = False
     self.err404 = "<html><head><title>404 Not Found</title></head><body><h1>Not Found</h1><p>The requested page was not found</p></body></html>"
     self.err400 = "<html><head><title>400 Bad Request</title></head><body><p>Invalid Request</p></body></html>"
    
@@ -115,6 +117,8 @@ class Application(mrhttp.CApp):
       self.uses_session = True
     if "mrq" in options:
       self.uses_mrq = True
+    if "mrq2" in options:
+      self.uses_mrq2 = True
 
     if not uri.startswith('/'): uri = '/' + uri
 
@@ -209,15 +213,23 @@ class Application(mrhttp.CApp):
       self._appStart() 
 
       if self.uses_mrq:
-        mrqconf = self.config.get("mrq", None)
-        if not mrqconf:
+        srvs = self.config.get("mrq", None)
+        if not srvs:
           print("When using MrQ app.config['mrq'] must be set. Exiting")
           exit(1)
-        srvs = self.config.get("mrq", None)
         if type(srvs) != list or len(srvs) == 0 or type(srvs[0]) != tuple:
           print("When using MrQ app.config['mrq'] must be set to a list of (host,port) tuple pairs. Exiting")
           exit(1)
         self._mrq = MrqClient( srvs, self.loop) 
+      if self.uses_mrq2:
+        srvs = self.config.get("mrq2", None)
+        if not srvs:
+          print("When using mrq2 app.config['mrq2'] must be set. Exiting")
+          exit(1)
+        if type(srvs) != list or len(srvs) == 0 or type(srvs[0]) != tuple:
+          print("When using MrQ app.config['mrq'] must be set to a list of (host,port) tuple pairs. Exiting")
+          exit(1)
+        self._mrq2 = MrqClient( srvs, self.loop) 
 
       if self.uses_session:
 
diff --git a/src/mrhttp/internals/app.h b/src/mrhttp/internals/app.h
index e19d456..016acc5 100644
--- a/src/mrhttp/internals/app.h
+++ b/src/mrhttp/internals/app.h
@@ -41,6 +41,7 @@ typedef struct {
   // Clients
   PyObject *py_mc;
   PyObject *py_mrq;
+  PyObject *py_mrq2;
   PyObject *py_mrc;
   PyObject *py_redis;
   PyObject *py_session_backend_type; // int 1,2,3 ( memcached, mrworkserver, mrcache )
diff --git a/src/mrhttp/internals/module.h b/src/mrhttp/internals/module.h
index 293094d..ff88879 100644
--- a/src/mrhttp/internals/module.h
+++ b/src/mrhttp/internals/module.h
@@ -85,6 +85,7 @@ static PyMethodDef MrhttpApp_methods[] = {
 static PyMemberDef MrhttpApp_members[] = {
     {"_mc", T_OBJECT, offsetof(MrhttpApp, py_mc), 0, NULL},
     {"_mrq", T_OBJECT, offsetof(MrhttpApp, py_mrq), 0, NULL},
+    {"_mrq2", T_OBJECT, offsetof(MrhttpApp, py_mrq2), 0, NULL},
     {"_redis", T_OBJECT, offsetof(MrhttpApp, py_redis), 0, NULL},
     {"_session_client", T_OBJECT, offsetof(MrhttpApp, py_session), 0, NULL},
     {"session_backend_type", T_OBJECT, offsetof(MrhttpApp, py_session_backend_type), 0, NULL},
diff --git a/src/mrhttp/internals/protocol.c b/src/mrhttp/internals/protocol.c
index a1c77b7..d38862c 100644
--- a/src/mrhttp/internals/protocol.c
+++ b/src/mrhttp/internals/protocol.c
@@ -328,7 +328,10 @@ void Protocol_on_memcached_reply( SessionCallbackData *scd, char *data, int data
   if ( !self->closed ) {
 
     Route *r = req->route;
-    if ( r->mrq ) { 
+    if ( r->mrq || r->mrq2 ) { 
+      MrqClient *py_mrq;
+      if ( r->mrq ) py_mrq = self->app->py_mrq;
+      if ( r->mrq2) py_mrq = self->app->py_mrq2;
       int slot = 0;
 
       // Pull slot from the first arg. Must be a number though a string won't break
@@ -374,7 +377,7 @@ void Protocol_on_memcached_reply( SessionCallbackData *scd, char *data, int data
             memcpy(p, data, data_sz);
             p += data_sz;
             *p++ = ']';
-            rc = MrqClient_pushj( (MrqClient*)self->app->py_mrq, slot, tmp, (int)(p-tmp) );
+            rc = MrqClient_pushj( py_mrq, slot, tmp, (int)(p-tmp) );
             free(tmp);
           } else {
 
@@ -385,7 +388,7 @@ void Protocol_on_memcached_reply( SessionCallbackData *scd, char *data, int data
             p += req->body_len;
             memcpy(p, data, data_sz);
             p += data_sz;
-            rc = MrqClient_push ( (MrqClient*)self->app->py_mrq, slot, tmp, (int)(p-tmp) );
+            rc = MrqClient_push ( py_mrq, slot, tmp, (int)(p-tmp) );
             free(tmp);
           }
 
@@ -429,9 +432,9 @@ void Protocol_on_memcached_reply( SessionCallbackData *scd, char *data, int data
         else {
           // Send body to mrq
           if ( req->py_mrpack == NULL ) {
-            MrqClient_pushj( (MrqClient*)self->app->py_mrq, slot, req->body, req->body_len );
+            MrqClient_pushj( py_mrq, slot, req->body, req->body_len );
           } else {
-            MrqClient_push ( (MrqClient*)self->app->py_mrq, slot, req->body, req->body_len );
+            MrqClient_push ( py_mrq, slot, req->body, req->body_len );
           }
         }
       } else {
@@ -513,13 +516,16 @@ Protocol* Protocol_on_body(Protocol* self, char* body, size_t body_len) {
     }
 
     // if mrq return now as the user is not logged in
-    if ( r->mrq ) {
+    if ( r->mrq || r->mrq2 ) {
       return Protocol_handle_request( self, self->request, r );
     }
 
     //?  PyObject *ret = pipeline_queue(self, (PipelineRequest){true, self->request, task});
   }
-  if ( r->mrq ) { //TODO
+  if ( r->mrq || r->mrq2 ) { 
+    MrqClient *py_mrq;
+    if ( r->mrq ) py_mrq = self->app->py_mrq;
+    if ( r->mrq2) py_mrq = self->app->py_mrq2;
     DBG printf("Route uses mrq\n"); 
     int slot = 0;
     // Pull slot from the first arg. Must be a number
@@ -531,9 +537,9 @@ Protocol* Protocol_on_body(Protocol* self, char* body, size_t body_len) {
 
     // Send body to mrq
     if ( self->request->py_mrpack == NULL ) {
-      MrqClient_pushj( (MrqClient*)self->app->py_mrq, slot, self->request->body, self->request->body_len );
+      MrqClient_pushj( py_mrq, slot, self->request->body, self->request->body_len );
     } else {
-      MrqClient_push ( (MrqClient*)self->app->py_mrq, slot, self->request->body, self->request->body_len );
+      MrqClient_push ( py_mrq, slot, self->request->body, self->request->body_len );
     }
 
     // TODO set a client member to say success/fail? Have to start failing if slow consumer / connection gone.
diff --git a/src/mrhttp/internals/router.c b/src/mrhttp/internals/router.c
index 5d4df8b..ae9f9cc 100644
--- a/src/mrhttp/internals/router.c
+++ b/src/mrhttp/internals/router.c
@@ -69,7 +69,7 @@ PyObject *Router_setupRoutes (Router* self) {
     r = PyList_GetItem(sroutes, i);
     rte->iscoro  = false;
     rte->session = false;
-    rte->mrq = false; rte->append_user = false;
+    rte->mrq = false; rte->mrq2 = false; rte->append_user = false;
 
     PyObject *handler = PyLong_AsVoidPtr(PyDict_GetItemString( r, "handler" ));
     rte->func = handler;
@@ -78,6 +78,7 @@ PyObject *Router_setupRoutes (Router* self) {
     if ( Py_True == PyDict_GetItemString( r, "iscoro"  ) ) rte->iscoro  = true;
     if ( Py_True == PyDict_GetItemString( r, "session" ) ) rte->session = true;
     if ( Py_True == PyDict_GetItemString( r, "mrq" ) ) rte->mrq = true;
+    if ( Py_True == PyDict_GetItemString( r, "mrq2" ) ) rte->mrq2 = true;
     if ( Py_True == PyDict_GetItemString( r, "append_user" ) ) rte->append_user = true;
     //if ( Py_True == PyDict_GetItemString( r, "append_user" ) ) printf("mrq append user set\n");
     o = PyDict_GetItemString( r, "type"  );
@@ -111,10 +112,11 @@ PyObject *Router_setupRoutes (Router* self) {
     rte->path = PyUnicode_AsUTF8AndSize( o, &(rte->len) );
     DBG printf( " path len %ld str %.*s\n", rte->len, (int)rte->len, rte->path );
 
-    rte->iscoro  = false; rte->session = false; rte->mrq = false;
+    rte->iscoro  = false; rte->session = false; rte->mrq = false; rte->mrq2 = false;
     if ( Py_True == PyDict_GetItemString( r, "iscoro"      ) ) rte->iscoro = true;
     if ( Py_True == PyDict_GetItemString( r, "session"     ) ) rte->session = true;
     if ( Py_True == PyDict_GetItemString( r, "mrq"         ) ) rte->mrq = true;
+    if ( Py_True == PyDict_GetItemString( r, "mrq2"         ) ) rte->mrq2 = true;
     if ( Py_True == PyDict_GetItemString( r, "append_user" ) ) rte->append_user = true;
     o = PyDict_GetItemString( r, "type"  );
     if (o) rte->mtype = PyLong_AsLong(o);
diff --git a/src/mrhttp/internals/router.h b/src/mrhttp/internals/router.h
index 36139b2..f7bbd2f 100644
--- a/src/mrhttp/internals/router.h
+++ b/src/mrhttp/internals/router.h
@@ -15,6 +15,7 @@ typedef struct {
   bool iscoro;
   bool session;
   bool mrq;
+  bool mrq2;
   bool append_user;
   char mtype;
   int max_byte_size;
diff --git a/src/mrhttp/router.py b/src/mrhttp/router.py
index 607f2c9..9365276 100644
--- a/src/mrhttp/router.py
+++ b/src/mrhttp/router.py
@@ -47,6 +47,7 @@ class Router(mrhttp.CRouter):
     #r["user_key"] = optiondict.get("append_user_key",None)
     if "session" in options: r["session"] = True
     if "mrq" in options: r["mrq"] = True
+    if "mrq2" in options: r["mrq2"] = True
     if "append_user" in options: r["append_user"] = True
     # Static routes
     if not "{" in uri:
diff --git a/workserver.py b/workserver.py
index cb1fdf9..61c8566 100644
--- a/workserver.py
+++ b/workserver.py
@@ -3,6 +3,7 @@ import mrworkserver
 
 async def callback(ws, msgs):
   for m in msgs:
+    print(m)
     pass
 
 ws = mrworkserver.WorkServer(callback=callback)