Asynchronous I/O
Introduction
CppCMS is build around a central asynchronous event loop. All incoming connections are handled, there, all asynchronous applications use the event loop.
Even though we have seen how to write a simple asynchronous applications, it is a good idea to get familiar with asynchronous programming and with booster::aio namespace concepts and classes.
We will show a simple example of using the event loop by creating a simple TCP echo server.
The Event Loop
The central concept is the booster::aio::io_service
class
that handles all I/O events.
All classes that want to handle various events
asynchronously should use it. You can access the
central cppcms::service
's io_service
by accessing
cppcms::service::get_io_service()
member function.
In our simple case we would create our own event loop:
int main() { try { booster::aio::io_service srv; srv.run(); } catch(std::exception const &e) { std::cerr<<e.what()<<std::endl; } }
Note: run()
will not exit until someone calls srv.stop()
.
First Use
So let's create our first very simple asynchronous object that would allow us to stop the service by pressing "Enter" on the keyboard.
So let's create an object that can use io_service
booster::aio::basic_io_device stdin_device(srv);
Then we attach the standard input file descriptor to it:
stdin_device.attach(0);
Note: attach(int fd)
function only tells the stdin_device
to use the file descriptor but not
to own it. i.e. It would not close it on the destruction.
Then we define a handler that should be called when the device become readable:
stdin_device.on_readable( std::bind(&booster::aio::io_service::stop,&srv) );
Note: C++11 std::bind
creates a callback object that
calls stop
member function of the srv
object. If you don't have C++11
compiler you can use boost::bind
or std::tr1::bind
.
Now lets print the notice:
std::cout << "Press any key to stop" << std::endl;
And we got our first "stoppable" service:
try { booster::aio::io_service srv; booster::aio::basic_io_device stdin_device(srv); stdin_device.attach(0); std::cout << "Press any key to stop" << std::endl; stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv)); srv.run(); } catch(std::exception const &e) { std::cerr<<e.what()<<std::endl; }
When the user presses "Enter" it the 0 file descriptor
becomes readable and srv.stop()
will be called
to stop our program.
Note: this sample is valid only on POSIX OS where
all file descriptors are select
able. It would
not work on Windows or Cygwin.
This is the method we can use for integrating any
application that supports "polling" to booster::aio
event loop.
Echo Server
Classes
We would need to use two classes:
- The object responsible for accepting
new connections -
echo_acceptor
- The object responsible for handing specific TCP/IP
echo session:
echo_session
Echo Session Scratch
Let's show a scratch for echo session:
class echo_session : public booster::enable_shared_from_this<echo_session> { public: echo_session(booster::aio::io_service &s) : socket_(s) { } void run(); ... private: ... friend class echo_acceptor; booster::aio::stream_socket socket_; };
It has a constructor that receives io_service
as
a parameter. It has a socket_
member - the
class that is capable to handle general STREAM_SOCK
operations.
It also has a run()
member function that
tells it to start working.
Note: use enable_shared_from_this
support and
we will see how we use it later.
Acceptor
Now back to the acceptor.
class echo_acceptor { public: ... private: booster::aio::acceptor acceptor_; booster::shared_ptr<echo_session> new_session_; };
It has two members:
- The acceptor object - the socket that is used to accept the connections
- The pointer to newly created
echo_session
object that would be responsible for starting the connection.
The constructor of the object would prepare the socket for accepting new connections:
echo_acceptor(booster::aio::io_service &srv) : acceptor_(srv) { booster::aio::endpoint ep("0.0.0.0",8080); acceptor_.open(ep.family()); acceptor_.set_option(booster::aio::acceptor::reuse_address,true); acceptor_.bind(ep); acceptor_.listen(10); }
- First we create a
endpoint
object - IP and port we want to bind to. We open it using
end_point
's family - i.e.AF_INET
AF_INET6
orAF_UNIX
. Such code would allow as to write the same code for IPv5, IPv6 and Unix domain sockets.For example changing
booster::aio::endpoint ep("0.0.0.0",8080);
To
booster::aio::endpoint ep("/tmp/mysock");
Would switch entire code to use Unix domain sockets.
We setup the reuse option, bind and start listening on the port.
Now let's create our main asynchronous function
void run() { new_session_.reset(new echo_session(acceptor_.get_io_service())); acceptor_.async_accept(new_session_->socket_, std::bind(&echo_acceptor::on_accepted,this,std::placeholders::_1)); }
First we create a new session_object
using acceptor_
's
io_service
and start asynchronous accepting.
We provide two parameters:
The socket we accept the connection into
new_session_->socket_
Note the
socket_
object must remain valid till the operation completes.The callback that should be executed upon event completion. We use
std::bind
to connect theon_accepted
member function.void on_accepted(booster::system::error_code const &e) { if(e) { std::cout << e.message() << std::endl; run(); } else { new_session_->run(); run(); } }
This function receives a single parameter - the error_code
Each completion handler requires a error code as first
parameter. We ignored it in the case of stdin_device
but in general would should always use it.
If error occurred if(w)
we report the error
and restart the asynchronous operation once again.
Alternatively we could call
acceptor_.get_io_service().stop();
And abort the program execution.
In case of success, the socket_
now holds a valid
opened TCP/IP connection and we call the
new_session_->run();
to let the object to deal with it.
Now the session object itself would be responsive on its
own life time, we can safely restart the service
and create new echo_session
object for new incoming
connection.
Echo Session
Now let's go back to our echo_session
object:
First we add a buffer that would hold a temporary data received from the client:
private: char buffer_[1024];
Now let's define our run()
member function:
void run() { socket_.async_read_some( booster::aio::buffer(buffer_,sizeof(buffer_)), std::bind(&echo_session::on_read,shared_from_this(), std::placeholders::_1, std::placeholders::_2)); }
We tell the socket to read at most
sizeof buffer_
bytes to thebuffer_
area.buffer_
should remain valid until operation completesWe provide a callback
on_read
that would keep a reference pointer to "this" by usingshared_from_this()
function and receive two parameters: error code and the size of the data that was transfered.
Note: using shared_from_this()
allows us to keep the
object alive as long as some callbacks exist.
If there is no more callbacks to execute, the reference counter would go to 0, the object will be destroyed and the socket would be closed.
The on_read
member function would look like this:
void on_read(booster::system::error_code const &e,size_t tr) { if(e) return; socket_.async_write(booster::aio::buffer(buffer_,tr), std::bind(&echo_session::on_written,shared_from_this(),std::placeholders::_1)); }
If a error occurs (EOF is error as well) we just
exit and stop execution. The session object would
be destroyed. Otherwise we call socket_.async_write
.
Unlike async_read_some
or async_write_some
it ensures
that all the data is transferred which makes the
programming easier. So in the same way we send
all the data out and call on_written
on the
completion.
Because we use async_write
and not async_write_some
we do not care about the number of bytes transferred
as it should be as the size of the data we received - tr
.
And finally when we receive write completion notification we restart reading once again in same manner
void on_written(booster::system::error_code const &e) { if(e) return; run(); }
Event Loop Integration
In order to setup echo server we add to our main function the following code:
echo_acceptor acc(srv); acc.run();
Before we call srv.run()
So the final main function looks like:
try { booster::aio::io_service srv; booster::aio::basic_io_device stdin_device(srv); stdin_device.attach(0); std::cout << "Press any key to stop" << std::endl; stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv)); echo_acceptor acc(srv); acc.run(); srv.run(); } catch(std::exception const &e) { std::cerr<<e.what()<<std::endl; }
Booster.Aio vs Boost.Asio
If you are familiar with Boost.Asio, you will
find that booster::aio
is very similar.
Indeed it was strongly influenced by Boost.Asio but it has several key differences:
It has less features the Boost.Asio and it is much simpler.
It uses OOP rather then template meta-programming.
For example making the code above to work with Unix domain sockets with Boost.Asio you had to make everything template based.
It always uses Reactor under the hood which allows you to attach as socket without actually passing the ownership to the object which makes it much easier integration with 3rd party libraries.
On the other hand it does not use IOCP under Windows.
It does not allow to call
io_service::run()
from several threads simultaneously.io_service::run()
runs tillstop()
is called and not until there is no more "work" to do.It is much more fork friendly.
There are more differences but if you are familiar
with Boost.Asio it would be easy for you to use
booster::aio
.