Main  /  Edit  /  History  /   /  Users Area

Asynchronous I/O

Introduction

CppCMS is build around a central asynchronous event loop. All incoming connections are handled, there, all asynchronous applications use the event loop.

Even though we have seen how to write a simple asynchronous applications, it is a good idea to get familiar with asynchronous programming and with booster::aio namespace concepts and classes.

We will show a simple example of using the event loop by creating a simple TCP echo server.

The Event Loop

The central concept is the booster::aio::io_service class that handles all I/O events. All classes that want to handle various events asynchronously should use it. You can access the central cppcms::service's io_service by accessing cppcms::service::get_io_service() member function.

In our simple case we would create our own event loop:

int main()
{
    try {
        booster::aio::io_service srv;
        srv.run();
    }
    catch(std::exception const &e) {
        std::cerr<<e.what()<<std::endl;
    }
}

Note: run() will not exit until someone calls srv.stop().

First Use

So let's create our first very simple asynchronous object that would allow us to stop the service by pressing "Enter" on the keyboard.

So let's create an object that can use io_service

booster::aio::basic_io_device stdin_device(srv);

Then we attach the standard input file descriptor to it:

stdin_device.attach(0);

Note: attach(int fd) function only tells the stdin_device to use the file descriptor but not to own it. i.e. It would not close it on the destruction.

Then we define a handler that should be called when the device become readable:

stdin_device.on_readable(
  std::bind(&booster::aio::io_service::stop,&srv)
);

Note: C++11 std::bind creates a callback object that calls stop member function of the srv object. If you don't have C++11 compiler you can use boost::bind or std::tr1::bind.

Now lets print the notice:

std::cout << "Press any key to stop" << std::endl;

And we got our first "stoppable" service:

try {
    booster::aio::io_service srv;

    booster::aio::basic_io_device stdin_device(srv);
    stdin_device.attach(0);
    std::cout << "Press any key to stop" << std::endl;
    stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv));
    srv.run();
}
catch(std::exception const &e) {
    std::cerr<<e.what()<<std::endl;
}

When the user presses "Enter" it the 0 file descriptor becomes readable and srv.stop() will be called to stop our program.

Note: this sample is valid only on POSIX OS where all file descriptors are selectable. It would not work on Windows or Cygwin.

This is the method we can use for integrating any application that supports "polling" to booster::aio event loop.

Echo Server

Classes

We would need to use two classes:

  1. The object responsible for accepting new connections - echo_acceptor
  2. The object responsible for handing specific TCP/IP echo session: echo_session

Echo Session Scratch

Let's show a scratch for echo session:

class echo_session : 
  public booster::enable_shared_from_this<echo_session> {
public:
    echo_session(booster::aio::io_service &s) :
        socket_(s)
    {
    }

    void run();
    ...
private:
    ...
    friend class echo_acceptor;
    booster::aio::stream_socket socket_;
};

It has a constructor that receives io_service as a parameter. It has a socket_ member - the class that is capable to handle general STREAM_SOCK operations.

It also has a run() member function that tells it to start working.

Note: use enable_shared_from_this support and we will see how we use it later.

Acceptor

Now back to the acceptor.

class echo_acceptor {
public:
    ...
private:
    booster::aio::acceptor acceptor_;
    booster::shared_ptr<echo_session> new_session_;
};

It has two members:

  1. The acceptor object - the socket that is used to accept the connections
  2. The pointer to newly created echo_session object that would be responsible for starting the connection.

The constructor of the object would prepare the socket for accepting new connections:

echo_acceptor(booster::aio::io_service &srv) : acceptor_(srv)
{
    booster::aio::endpoint ep("0.0.0.0",8080);
    acceptor_.open(ep.family());
    acceptor_.set_option(booster::aio::acceptor::reuse_address,true);
    acceptor_.bind(ep);
    acceptor_.listen(10);
}
  1. First we create a endpoint object - IP and port we want to bind to.
  2. We open it using end_point's family - i.e. AF_INET AF_INET6 or AF_UNIX. Such code would allow as to write the same code for IPv5, IPv6 and Unix domain sockets.

    For example changing

      booster::aio::endpoint ep("0.0.0.0",8080);
    

    To

      booster::aio::endpoint ep("/tmp/mysock");
    

    Would switch entire code to use Unix domain sockets.

  3. We setup the reuse option, bind and start listening on the port.

Now let's create our main asynchronous function

void run()
{
    new_session_.reset(new echo_session(acceptor_.get_io_service()));
    acceptor_.async_accept(new_session_->socket_,
        std::bind(&echo_acceptor::on_accepted,this,std::placeholders::_1));
}

First we create a new session_object using acceptor_'s io_service and start asynchronous accepting.

We provide two parameters:

  1. The socket we accept the connection into new_session_->socket_

    Note the socket_ object must remain valid till the operation completes.

  2. The callback that should be executed upon event completion. We use std::bind to connect the on_accepted member function.

    void on_accepted(booster::system::error_code const &e) { if(e) { std::cout << e.message() << std::endl; run(); } else { new_session_->run(); run(); } }

This function receives a single parameter - the error_code

Each completion handler requires a error code as first parameter. We ignored it in the case of stdin_device but in general would should always use it.

If error occurred if(w) we report the error and restart the asynchronous operation once again. Alternatively we could call

acceptor_.get_io_service().stop();

And abort the program execution.

In case of success, the socket_ now holds a valid opened TCP/IP connection and we call the new_session_->run(); to let the object to deal with it.

Now the session object itself would be responsive on its own life time, we can safely restart the service and create new echo_session object for new incoming connection.

Echo Session

Now let's go back to our echo_session object:

First we add a buffer that would hold a temporary data received from the client:

private:
  char buffer_[1024];

Now let's define our run() member function:

void run()
{
    socket_.async_read_some(
        booster::aio::buffer(buffer_,sizeof(buffer_)),
            std::bind(&echo_session::on_read,shared_from_this(),
                    std::placeholders::_1,
                    std::placeholders::_2));

}
  1. We tell the socket to read at most sizeof buffer_ bytes to the buffer_ area.

    buffer_ should remain valid until operation completes

  2. We provide a callback on_read that would keep a reference pointer to "this" by using shared_from_this() function and receive two parameters: error code and the size of the data that was transfered.

Note: using shared_from_this() allows us to keep the object alive as long as some callbacks exist.

If there is no more callbacks to execute, the reference counter would go to 0, the object will be destroyed and the socket would be closed.

The on_read member function would look like this:

void on_read(booster::system::error_code const &e,size_t tr)
{
    if(e) return;

    socket_.async_write(booster::aio::buffer(buffer_,tr),
            std::bind(&echo_session::on_written,shared_from_this(),std::placeholders::_1));
}

If a error occurs (EOF is error as well) we just exit and stop execution. The session object would be destroyed. Otherwise we call socket_.async_write.

Unlike async_read_some or async_write_some it ensures that all the data is transferred which makes the programming easier. So in the same way we send all the data out and call on_written on the completion.

Because we use async_write and not async_write_some we do not care about the number of bytes transferred as it should be as the size of the data we received - tr.

And finally when we receive write completion notification we restart reading once again in same manner

void on_written(booster::system::error_code const &e)
{
    if(e) return;
    run();
}

Event Loop Integration

In order to setup echo server we add to our main function the following code:

echo_acceptor acc(srv);
acc.run();

Before we call srv.run()

So the final main function looks like:

try {
    booster::aio::io_service srv;

    booster::aio::basic_io_device stdin_device(srv);
    stdin_device.attach(0);
    std::cout << "Press any key to stop" << std::endl;
    stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv));

    echo_acceptor acc(srv);
    acc.run();

    srv.run();
}
catch(std::exception const &e) {
    std::cerr<<e.what()<<std::endl;
}

Booster.Aio vs Boost.Asio

If you are familiar with Boost.Asio, you will find that booster::aio is very similar.

Indeed it was strongly influenced by Boost.Asio but it has several key differences:

  1. It has less features the Boost.Asio and it is much simpler.

  2. It uses OOP rather then template meta-programming.

    For example making the code above to work with Unix domain sockets with Boost.Asio you had to make everything template based.

  3. It always uses Reactor under the hood which allows you to attach as socket without actually passing the ownership to the object which makes it much easier integration with 3rd party libraries.

    On the other hand it does not use IOCP under Windows.

  4. It does not allow to call io_service::run() from several threads simultaneously.

  5. io_service::run() runs till stop() is called and not until there is no more "work" to do.

  6. It is much more fork friendly.

There are more differences but if you are familiar with Boost.Asio it would be easy for you to use booster::aio.


Advanced Caching | Top | Secure Programming with CppCMS

About

CppCMS is a web development framework for performance demanding applications.

Support This Project

SourceForge.net Logo

Поддержать проект

CppCMS needs You


Navigation

Main Page



Valid CSS | Valid XHTML 1.0