CppCMS
json_rpc_chat/chat.cpp Source File
//                                                                             
//  Copyright (C) 2008-2012  Artyom Beilis (Tonkikh) <artyomtnk@yahoo.com>     
//                                                                             
//  See accompanying file COPYING.TXT file for licensing details.
//
#include <cppcms/application.h>
#include <cppcms/url_dispatcher.h>
#include <cppcms/applications_pool.h>
#include <cppcms/service.h>
#include <cppcms/http_response.h>
#include <cppcms/http_request.h>
#include <cppcms/http_context.h>
#include <cppcms/rpc_json.h>
#include <cppcms/json.h>
#include <booster/intrusive_ptr.h>
#include <booster/aio/deadline_timer.h>
#include <booster/system_error.h>

#include <set>

#include <boost/bind.hpp>
using boost::bind;


class chat : public cppcms::rpc::json_rpc_server {
public:
    chat(cppcms::service &srv) : 
        cppcms::rpc::json_rpc_server(srv),
        timer_(srv.get_io_service())
    {
        // Our main methods
        bind("post",cppcms::rpc::json_method(&chat::post,this),notification_role);
        bind("get",cppcms::rpc::json_method(&chat::get,this),method_role);
        
        // Add timeouts to the system
        last_wake_ = time(0);
        on_timer(booster::system::error_code());
    }

    // Handle new message call
    void post(std::string const &author,std::string const &message)
    {
        cppcms::json::value obj;
        obj["author"]=author;
        obj["message"]=message;
        messages_.push_back(obj);
        broadcast(messages_.size()-1);
    }

    void on_timer(booster::system::error_code const &e)
    {
        if(e) return; // cancelation

        // check idle connections for more then 10 seconds
        if(time(0) - last_wake_ > 10) {
            broadcast(messages_.size());
        }
        // restart timer
        timer_.expires_from_now(booster::ptime::seconds(1));
        timer_.async_wait(boost::bind(&chat::on_timer,booster::intrusive_ptr<chat>(this),_1));
    }

    // Handle request
    void get(unsigned from)
    {
        if(from < messages_.size()) {
            // not long polling - return result now
            return_result(make_response(from));
            return;
        }
        else if(from == messages_.size()) {
            // Can't answer now

            // Add long polling request to the list

            booster::shared_ptr<cppcms::rpc::json_call> call=release_call();
            waiters_.insert(call);

            // set disconnect callback
            call->context().async_on_peer_reset(
                boost::bind(
                    &chat::remove_context,
                    booster::intrusive_ptr<chat>(this),
                    call));
        }
        else {
            return_error("Invalid position");
        }
    }

    // handle client disconnect
    void remove_context(booster::shared_ptr<cppcms::rpc::json_call> call)
    {
        waiters_.erase(call);
    }

    void broadcast(size_t from)
    {
        // update timeout
        last_wake_ = time(0);
        // Prepare response
        cppcms::json::value response = make_response(from);
        // Send it to everybody
        for(waiters_type::iterator waiter=waiters_.begin();waiter!=waiters_.end();++waiter) {
            booster::shared_ptr<cppcms::rpc::json_call> call = *waiter;
            call->return_result(response);
        }
        waiters_.clear();
    }

    // Prepare response to the client
    cppcms::json::value make_response(size_t n)
    {
        cppcms::json::value v;

        // Small optimization
        v=cppcms::json::array();
        cppcms::json::array &ar  = v.array();
        ar.reserve(messages_.size() - n);

        // prepare all messages
        for(size_t i=n;i<messages_.size();i++) {
            ar.push_back(messages_[i]);
        }
        return v;
    }
private:

    // message store
    std::vector<cppcms::json::value> messages_;

    // long poll requests
    typedef std::set<booster::shared_ptr<cppcms::rpc::json_call> > waiters_type;
    waiters_type waiters_;

    // timer for resetting idle requests
    booster::aio::deadline_timer timer_;
    time_t last_wake_;
};


int main(int argc,char **argv)
{
    try {
        cppcms::service service(argc,argv);
        booster::intrusive_ptr<chat> c=new chat(service);
        service.applications_pool().mount(c);
        service.run();
    }
    catch(std::exception const &e) {
        std::cerr<<"Catched exception: "<<e.what()<<std::endl;
        return 1;
    }
    return 0;
}
// vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4