Main  /  Edit  /  History  /   /  Users Area

Non Blocking IO

CppCMS 1.1 introduced non-blocking I/O that improves basic asynchronous I/O.

Problem

We have a simple application that broadcasts same message to multiple clients:

  ...
  void on_new_conn()
  {
     ...
     conns_.insert(release_context());
  }
  void broadcast(std::string const &msg)
  {
      using cppcms::http::context;
      for(auto ctx :conns_) {
         ctx->response().out() << msg;
         ctx->async_flush_output(
            [this](context::competion_type c){
               if(c==context::operation_completed) {
                 this->conns_.insert(ctx);
               }
            }
         );
      }
      conns_.clear();
  }
  std::set<booster::shared_ptr<cppcms::http::context> > conns_;
  ...

So in order to send a message of few bytes that most likely would be executed immediately you copy the data to intermediate buffers, create a callback object for an event loop and insert context back to wait queue.

It may be very costly operation.

Non Blocking API

So CppCMS 1.1 introduced non-blocking API to cppcms::http::response.

full_asynchronous_buffering(bool enable) - enable or disable full buffering. By disabling it you allow to send output when buffer becomes full or upon flushing output stream.

Now if I/O operation blocks - the pending output is stored and it is possible to check this case by calling pending_blocked_output() member function. In such a case you can resort to standard context::async_flush_output(callback) and wait when the stream becomes ready.

Our example becomes, little bit more complex:

  void on_new_conn()
  {
     ...
     response().full_asynchronous_buffering(false);
     conns_.insert(release_context());
  }
  void broadcast(std::string const &msg)
  {
      using cppcms::http::context;
      auto pctx = conns_.begin()
      while(pctx!=conns_.end());
      {
         auto ctx = *pctx++;
         ctx->response().out() << msg << std::flush;
         if(!ctx->response().out()) {
            conns_.erase(ctx); 
            continue;
         }
         if(!ctx->response().pending_blocked_output())
             continue;
         conns_.erase(ctx);
         ctx->async_flush_output(
            [=](context::competion_type c){
               if(c==context::operation_completed) {
                 this->conns_.insert(ctx);
               }
            }
         );
      }
  }
  std::set<booster::shared_ptr<cppcms::http::context> conns_;
  ...

First we disable full output buffering such that I/O operations on response::out can actually generate traffic.

ctx->response().out() << msg << std::flush;

Than during broadcast we write the data and flush the stream. We check if aerror occurred on the stream (for example remote client disconnected) and if so we remove the context from usable contexts list.

 if(!ctx->response().out()) {
    conns_.erase(ctx); 
    continue;
 }

Then if all data was written pending_blocked_output()==false we can continue no callbacks needed.

 if(!ctx->response().pending_blocked_output())
     continue;

And finally, if there is some stuff waiting to be sent we fall back to usual asynchronous callbacks:

 conns_.erase(ctx);
 ctx->async_flush_output(
    [=](context::competion_type c){
       if(c==context::operation_completed) {
         this->conns_.insert(ctx);
       }
    }
 );

This makes operations more complicated due to requirement of handling blocking operations, but in general this approach provide much higher performance especially when many small I/O operations are required.

Buffering

Additionally you can define output buffer size calling setbuf(int size).

One special case is setting the size to 0, in such a case than transferring the data becomes direct, without even coping memory. For example if you sending same prepared data to multiple connections it wouldn't be copied to temporary buffers but rather send directly over TCP/IP socket to each one of the clients.

In our example we send a single std::string as message so it fits our case. We need to do few changes.

First disable buffer entirely:

 response().full_asynchronous_buffering(false);
 response().setbuf(0); // no buffering
 conns_.insert(release_context());

Then we don't need to call std::flush on the output at all:

ctx->response().out() << msg;

But it does not hurt either.

About

CppCMS is a web development framework for performance demanding applications.

Support This Project

SourceForge.net Logo

Поддержать проект

CppCMS needs You


Navigation

Main Page


Valid CSS | Valid XHTML 1.0