Implementing Chat Using JSON-RPC
Introduction
In the previous tutorial we had shown how to use asynchronous applications to implement chat. The example was very primitive and had almost no error handling.
How we would create a better example using JSON-RPC and asynchronous applications.
API
The JSON-RPC call would have following methods:
post(author,message)- notification - post a new message to the chat board, by name ofauthorget(from)- method receive messages in chat starting from messagefrom. Returns an array of objects like:{ "author" : "name", "message" : "The message on the board" }
This API would allow us to return multiple messages at once when somebody new joins the chat, it would provide a better error handling.
Server Side Code
As any JSON-RPC service we would derive it from cppcms::rpc::json_rpc_server.
Data Members
Our class would have following members
All the messages already in JSON format:
std::vector<cppcms::json::value> messages_;
The waiters set. Note, cppcms::rpc::json_call has same
role as cppcms::http::context for asynchronous application:
typedef std::set<booster::shared_ptr<cppcms::rpc::json_call> > waiters_type; waiters_type waiters_;
Now, we would also add a timeout in order to make sure that there is no "too-long" polling done. This timer would wake us every 10 seconds and allow to cleanup the connections if nothing happened recently.
booster::aio::deadline_timer timer_;
This would mark the last time when something was POSTed to the chat:
time_t last_wake_;
Constructor
chat(cppcms::service &srv) :
cppcms::rpc::json_rpc_server(srv),
timer_(srv.get_io_service())
{
// Our main methods
bind("post",cppcms::rpc::json_method(&chat::post,this),notification_role);
bind("get",cppcms::rpc::json_method(&chat::get,this),method_role);
// Add timeouts to the system
last_wake_ = time(0);
on_timer(booster::system::error_code());
}
We associate two functions post and get as JSON-RPC
methods and we initialize our timer (in the constructor).
In the last line we "simulate" timer event - and on timer event we "restart" the timer, we will see the code later.
Post Method
void post(std::string const &author,std::string const &message)
{
cppcms::json::value obj;
obj["author"]=author;
obj["message"]=message;
messages_.push_back(obj);
broadcast(messages_.size()-1);
}
We receive the message, update the state and notify
all waiters to send messages starting from the last
known size - similarly to the broadcast() in the
previous example.
Broadcasting
The broadcasting would be similar to the case we used before.
void broadcast(size_t from)
{
// update timeout
last_wake_ = time(0);
// Prepare response
cppcms::json::value response = make_response(from);
// Send it to everybody
for(waiters_type::iterator waiter=waiters_.begin();waiter!=waiters_.end();++waiter) {
booster::shared_ptr<cppcms::rpc::json_call> call = *waiter;
call->return_result(response);
}
waiters_.clear();
}
Notes: return_result() does the asynchronous
response completion automatically.
The make_response function just generates the
response we need.
cppcms::json::value make_response(size_t n)
{
cppcms::json::value v;
// Small optimization
v=cppcms::json::array();
cppcms::json::array &ar = v.array();
ar.reserve(messages_.size() - n);
// prepare all messages
for(size_t i=n;i<messages_.size();i++) {
ar.push_back(messages_[i]);
}
return v;
}
Get Method
The get method:
void get(unsigned from)
{
...
}
Is very similar, first we handle the case when we can return result immediately:
if(from < messages_.size()) {
return_result(make_response(from));
}
Then we handle long polling situation:
else if(from == messages_.size()) {
booster::shared_ptr<cppcms::rpc::json_call>
call=release_call();
waiters_.insert(call);
call->context().async_on_peer_reset(
boost::bind(
&chat::remove_context,
booster::intrusive_ptr<chat>(this),
call));
}
Notes:
- We use
release_call()that has similar semantics asrelease_context()but used with JSON-RPC. - In order to handle disconnect events we use
cppcms::http::contextthat can still be accessed from the call.
And finally we handle the invalid counters as:
else {
return_error("Invalid position");
}
Timers
We do not want to rely on async_on_peer_reset so
we use timer that allows us to wake once in a while
and make sure that there is no "too-long" polling:
This is our on_timer function that receives a
error code as first parameter.
void on_timer(booster::system::error_code const &e)
{
Deadline timer can send only one type of error code: timer cancellation so if we get a error we just not restart the timer:
if(e) return; // cancelation
Then we check if there were no broadcasting for more then seconds. If so we force broadcast that would return an empty arrray:
if(time(0) - last_wake_ > 10) {
broadcast(messages_.size());
}
Then we restart the timer:
timer_.expires_from_now(booster::ptime::seconds(1));
timer_.async_wait(
boost::bind(
&chat::on_timer,
booster::intrusive_ptr<chat>(this),
_1));
Note the first _1 parameter that is passed to the
handler on_timer is the error code for the operation
completion.
Client Side
We would use a simple jsonrpc client that you can
find under contrib section in CppCMS sources.
Form
We create a simple form that allows us to
send a message using send_data() function.
We also add a special button reconnect_to_server
that would allow us to restart the connection in
case of error.
<form id="theform" >
<p>Name: <input id="author" type="text" value="" /></p>
<p>
Message: <input id="message" type="text" value="" /></p>
<input type="submit" value="Send" onclick="return send_data()"/>
<input disabled="disabled"
id='reconnect'
type='submit'
value='Reconnect'
onclick='return reconnect_to_server()' >
</p>
<p id='error_message'></p>
</form>
JavaScript
Now let's see our JavaScript code:
First we create globl JsonRPC object with two
methods get and post that is its URL is /chat
rpc = new JsonRPC('/chat',['get'],['post']);
Note, the first array is the list of methods and the second is the list of notifications.
Next we create m messages counter like in previous example:
message_count = 0;
Then we setup our callbacks:
rpc.get.on_result = function(messages) {
var messagesHtml = document.getElementById('messages');
for(var i=0;i<messages.length;i++) {
m=messages[i];
messagesHtml.innerHTML+='<dt>' + m.author +'</dt>' +
'<dd>' + m.message + '</dd>';
message_count++;
}
restart();
}
When get call execution completes sucessefully
it would update the "messages" element with new data
we received and restart() the operation.
When restart is:
function restart()
{
rpc.get(message_count);
}
Asynchronous call of get method.
We setup on_error callback for our method:
If error had occurred we disable the chat (we
do not call restart() and allow reconnect button)
and show a error message to the user with make_error
rpc.get.on_error = function(e) {
make_error('Getting New Messages',e);
document.getElementById('reconnect').disabled = false
}
In the same way we setup callbacks for post method:
rpc.post.on_result = function() {
// reset the form content
document.getElementById("message").value = '';
}
rpc.post.on_error = function(e) {
make_error('Posting New Messages',e);
}
We had seen that when we press "Send" button send_data()
function is called.
We define it this way:
function send_data() {
author = document.getElementById('author').value;
message = document.getElementById("message").value;
rpc.post(author,message);
return false;
}
Note: rpc.post is asynchronous RPC method call with
two parameters.
And the last but not the lest the function that allows use to reconnect to the web server:
function reconnect_to_server()
{
message_count = 0;
document.getElementById('error_message').innerHTML = '';
document.getElementById('messages').innerHTML = '';
document.getElementById('reconnect').disabled = true;
restart();
return false;
}
We clear all the content we got and start long polling once again.
That's it.
Full Code
You can find the full code here
