Implementing Chat
Introduction
HTTP Protocol by its nature designed to work the way the client initializes the request and the web server responds.
However it is frequently needed to push some real-time data to the client - we need to handle some server side events.
The one of the most basic techniques to implement server side events is to use a long polling.
- The client initiates a request
- The server does not respond immediately, but rather responds only when some server side event occurs
- The client receives the event and starts long polling once again by going to the step 1.
Ordinary CppCMS application run in a thread pool, thus not giving a direct response would keep the thread busy for a long time and this technique would not scale well.
So in order to handle long running requests CppCMS provides asynchronous applications - the application that can postpone the response for future use and handle multiple request simultaneously.
The API
As an example of using Comet techniques with CppCMS we would implement very simple chat service that would work in following way:
- We can post a new message to the chat by using POST request to
/chat/post
(the event). The form element should be "message". - We can get a message number N from the chat but using a long polling request to a URL
/get/N
. If the message exist it would be immediately returned as plain text. Otherwise it would be returned when the message would be ready (a post event would occur).
Client Side
We would use Dojo toolkit on the client side to make stuff simpler.
Let's define our new message form:
<form id="theform" > <input id="message" type="text" name="message" value="" /> <input type="submit" value="Send" onclick="return send_data()"/> </form>
When user clicks the "Send" button he would call
send_data()
function:
function send_data() { var kw = { url : "/chat/post", form : "theform" }; dojo.xhrPost(kw); dojo.byId("message").value=""; return false; }
That would send the POST form and reset the form value.
We would also need to setup the long polling loop:
Let's define the message counter to know what we need to pass to the Comet API
var message_count = 0;
Then setup the function that would be called in the asynchronous client side loop:
function read_data() { dojo.xhrGet( { url: "/chat/get/" + message_count, timeout: 10000, handleAs: "text", load: function(response, ioArgs) { dojo.byId("messages").innerHTML = response + '<br/>' + dojo.byId("messages").innerHTML; message_count++; read_data(); return response; }, error: function(response,ioArgs) { read_data(); return response; } }); }
When we receive the response, we update the HTML with the new message, increase message count and restart the process.
And finally we start the loop on page load.
dojo.addOnLoad(read_data);
Server Side
Mounting Asynchronous Application
On the server wide we would need to create an asynchronous application that would handle our requests.
Unlike synchronous applications that are created on demand by the special factory class, the asynchronous applications are mounted as a single instance only as they manage multiple connections on their own.
So for the class chat
the main function would look
like:
try { cppcms::service service(argc,argv); booster::intrusive_ptr<chat> c=new chat(service); service.applications_pool().mount(c); service.run(); } catch(std::exception const &e) { std::cerr<<"Error: "<<e.what()<<std::endl; return 1; } return 0;
Note - we use smart pointer intrusive_ptr
to handle
the life time of the chat
application. We also
mount this pointer directly to the applications pool.
We are responsible to keep the pointer on the application, of the reference counter goes to 0 the application is destroyed.
So we live it on the stack.
The Application
We start the class as usual application and define all required mappings:
class chat : public cppcms::application { public: chat(cppcms::service &srv) : cppcms::application(srv) { dispatcher().assign("/post",&chat::post,this); dispatcher().assign("/get/(\\d+)",&chat::get,this,1); } ... };
Now, because we use long polling technique we need to
store somewhere our connections. All request-response
related data is stored withing cppcms::http::context
object. We can receive a smart pointer on it
and handle its life-time as we need.
So we define a set of long polling contexts we want to notify:
private: typedef std::set<booster::shared_ptr<cppcms::http::context> > waiters_type; waiters_type waiters_;
We also need some storage for the messages withing our class:
std::vector<std::string> messages_;
Now let's handle our POST request:
void post() { if(request().request_method()=="POST") { messages_.push_back(request().post("message")); broadcast(); } }
Upon POST request we add a new message to the messages_
vector and calls a function broadcast()
that
notifies all connections that are waiting for the
response. We will show it later.
Now let's see how we handle get
requests:
void get(std::string no) { ... }
First we extract the number of the message we want to send
unsigned pos=atoi(no.c_str());
If it is ready we send it immediatly
if(pos < messages_.size()) { response().set_plain_text_header(); response().out()<<messages_[pos]; return; }
Then we check if the number is valid (not bigger then total number of messages), if not we return a error
if(pos > messages_.size() ){ response().status(404); return' }
Otherwise if it is the next message we want to receive, we start a long polling session
booster::shared_ptr<cppcms::http::context> context=release_context(); waiters_.insert(context); context->async_on_peer_reset( bind( &chat::remove_context, booster::intrusive_ptr<chat>(this), context));
Lets see line by line:
- We detach the context from the application
using
release_context()
member function. It is very important because if we do not do this. The response would be completed when we exit theget()
function. - We add it to our
waiters_
set. And the last one is definition of reset handler.
There may be a situation that the client had closed the connection. We don't want to keep it open so we add the handler that would remove the context from the
waiters_
list on connection reset.
Our remove_context would look like this:
void remove_context(booster::shared_ptr<cppcms::http::context> context) { waiters_.erase(context); }
Notes:
- We use pass
intrusive_ptr
with the handler to make sure that the application would be alive then the handler exists. In our case it is not really needed because the application exists as long as the service running, but for some applications that may be created and destroyed dynamically (let's say a chat-room) it may be important. - Generally it is good to handle
async_on_peer_reset
but not all web servers report it to the application and in generally it is not reliable. So it is a good idea to add some timer to cleanup "too-long" polling requests.
Now lets see how we broadcast all responses to clients:
void broadcast() { for(waiters_type::iterator it=waiters_.begin();it!=waiters_.end();++it) { booster::shared_ptr<cppcms::http::context> waiter = *it; waiter->response().set_plain_text_header(); waiter->response().out() << messages_.back(); waiter->async_complete_response(); } waiters_.clear(); }
Upon call of broadcast()
by the post()
function
we would send a message to each one of the
contexts and call async_complete_response()
that finalizes the response.
Once we called async_complete_response()
we can destroy
our shared_ptr
on the cppcms::http::context
as it
goes out of our responsibility.
Small note:
If you want to implement "streaming" technique you
can use async_flush_output()
with appropriate
completion handler.
Full Code
You can find the full code for this example here