Implementing Chat Using JSON-RPC
Introduction
In the previous tutorial we had shown how to use asynchronous applications to implement chat. The example was very primitive and had almost no error handling.
How we would create a better example using JSON-RPC and asynchronous applications.
API
The JSON-RPC call would have following methods:
post(author,message)
- notification - post a new message to the chat board, by name ofauthor
get(from)
- method receive messages in chat starting from messagefrom
. Returns an array of objects like:{ "author" : "name", "message" : "The message on the board" }
This API would allow us to return multiple messages at once when somebody new joins the chat, it would provide a better error handling.
Server Side Code
As any JSON-RPC service we would derive it from cppcms::rpc::json_rpc_server
.
Data Members
Our class would have following members
All the messages already in JSON format:
std::vector<cppcms::json::value> messages_;
The waiters set. Note, cppcms::rpc::json_call
has same
role as cppcms::http::context
for asynchronous application:
typedef std::set<booster::shared_ptr<cppcms::rpc::json_call> > waiters_type; waiters_type waiters_;
Now, we would also add a timeout in order to make sure that there is no "too-long" polling done. This timer would wake us every 10 seconds and allow to cleanup the connections if nothing happened recently.
booster::aio::deadline_timer timer_;
This would mark the last time when something was POSTed to the chat:
time_t last_wake_;
Constructor
chat(cppcms::service &srv) : cppcms::rpc::json_rpc_server(srv), timer_(srv.get_io_service()) { // Our main methods bind("post",cppcms::rpc::json_method(&chat::post,this),notification_role); bind("get",cppcms::rpc::json_method(&chat::get,this),method_role); // Add timeouts to the system last_wake_ = time(0); on_timer(booster::system::error_code()); }
We associate two functions post
and get
as JSON-RPC
methods and we initialize our timer (in the constructor).
In the last line we "simulate" timer event - and on timer event we "restart" the timer, we will see the code later.
Post Method
void post(std::string const &author,std::string const &message) { cppcms::json::value obj; obj["author"]=author; obj["message"]=message; messages_.push_back(obj); broadcast(messages_.size()-1); }
We receive the message, update the state and notify
all waiters to send messages starting from the last
known size - similarly to the broadcast()
in the
previous example.
Broadcasting
The broadcasting would be similar to the case we used before.
void broadcast(size_t from) { // update timeout last_wake_ = time(0); // Prepare response cppcms::json::value response = make_response(from); // Send it to everybody for(waiters_type::iterator waiter=waiters_.begin();waiter!=waiters_.end();++waiter) { booster::shared_ptr<cppcms::rpc::json_call> call = *waiter; call->return_result(response); } waiters_.clear(); }
Notes: return_result()
does the asynchronous
response completion automatically.
The make_response
function just generates the
response we need.
cppcms::json::value make_response(size_t n) { cppcms::json::value v; // Small optimization v=cppcms::json::array(); cppcms::json::array &ar = v.array(); ar.reserve(messages_.size() - n); // prepare all messages for(size_t i=n;i<messages_.size();i++) { ar.push_back(messages_[i]); } return v; }
Get Method
The get method:
void get(unsigned from) { ... }
Is very similar, first we handle the case when we can return result immediately:
if(from < messages_.size()) { return_result(make_response(from)); }
Then we handle long polling situation:
else if(from == messages_.size()) { booster::shared_ptr<cppcms::rpc::json_call> call=release_call(); waiters_.insert(call); call->context().async_on_peer_reset( boost::bind( &chat::remove_context, booster::intrusive_ptr<chat>(this), call)); }
Notes:
- We use
release_call()
that has similar semantics asrelease_context()
but used with JSON-RPC. - In order to handle disconnect events we use
cppcms::http::context
that can still be accessed from the call.
And finally we handle the invalid counters as:
else { return_error("Invalid position"); }
Timers
We do not want to rely on async_on_peer_reset
so
we use timer that allows us to wake once in a while
and make sure that there is no "too-long" polling:
This is our on_timer
function that receives a
error code as first parameter.
void on_timer(booster::system::error_code const &e) {
Deadline timer can send only one type of error code: timer cancellation so if we get a error we just not restart the timer:
if(e) return; // cancelation
Then we check if there were no broadcasting for more then seconds. If so we force broadcast that would return an empty arrray:
if(time(0) - last_wake_ > 10) { broadcast(messages_.size()); }
Then we restart the timer:
timer_.expires_from_now(booster::ptime::seconds(1)); timer_.async_wait( boost::bind( &chat::on_timer, booster::intrusive_ptr<chat>(this), _1));
Note the first _1
parameter that is passed to the
handler on_timer
is the error code for the operation
completion.
Client Side
We would use a simple jsonrpc
client that you can
find under contrib
section in CppCMS sources.
Form
We create a simple form that allows us to
send a message using send_data()
function.
We also add a special button reconnect_to_server
that would allow us to restart the connection in
case of error.
<form id="theform" > <p>Name: <input id="author" type="text" value="" /></p> <p> Message: <input id="message" type="text" value="" /></p> <input type="submit" value="Send" onclick="return send_data()"/> <input disabled="disabled" id='reconnect' type='submit' value='Reconnect' onclick='return reconnect_to_server()' > </p> <p id='error_message'></p> </form>
JavaScript
Now let's see our JavaScript code:
First we create globl JsonRPC object with two
methods get
and post
that is its URL is /chat
rpc = new JsonRPC('/chat',['get'],['post']);
Note, the first array is the list of methods and the second is the list of notifications.
Next we create m messages counter like in previous example:
message_count = 0;
Then we setup our callbacks:
rpc.get.on_result = function(messages) { var messagesHtml = document.getElementById('messages'); for(var i=0;i<messages.length;i++) { m=messages[i]; messagesHtml.innerHTML+='<dt>' + m.author +'</dt>' + '<dd>' + m.message + '</dd>'; message_count++; } restart(); }
When get
call execution completes sucessefully
it would update the "messages" element with new data
we received and restart()
the operation.
When restart is:
function restart() { rpc.get(message_count); }
Asynchronous call of get
method.
We setup on_error
callback for our method:
If error had occurred we disable the chat (we
do not call restart()
and allow reconnect button)
and show a error message to the user with make_error
rpc.get.on_error = function(e) { make_error('Getting New Messages',e); document.getElementById('reconnect').disabled = false }
In the same way we setup callbacks for post
method:
rpc.post.on_result = function() { // reset the form content document.getElementById("message").value = ''; } rpc.post.on_error = function(e) { make_error('Posting New Messages',e); }
We had seen that when we press "Send" button send_data()
function is called.
We define it this way:
function send_data() { author = document.getElementById('author').value; message = document.getElementById("message").value; rpc.post(author,message); return false; }
Note: rpc.post
is asynchronous RPC method call with
two parameters.
And the last but not the lest the function that allows use to reconnect to the web server:
function reconnect_to_server() { message_count = 0; document.getElementById('error_message').innerHTML = ''; document.getElementById('messages').innerHTML = ''; document.getElementById('reconnect').disabled = true; restart(); return false; }
We clear all the content we got and start long polling once again.
That's it.
Full Code
You can find the full code here