Main  /  Edit version 3  /  Edit version 4  /   /  Users Area

Difference "Asynchronous I/O" ver. 3 versus ver. 4

Content:

<!--toc-->
## Introduction
CppCMS is build around a central asynchronous event loop. All incoming connections are handled, there, all asynchronous
applications use the event loop.
Even though we have seen how to write a simple asynchronous
applications, it is a good idea to get familiar with
asynchronous programming and with [booster::aio](/cppcms_ref_v0_99/namespacebooster_1_1aio.html)
namespace concepts and classes.
We will show a simple example of using the event loop
by creating a simple TCP echo server.
## The Event Loop
The central concept is the `booster::aio::io_service` class
that handles all I/O events.
All classes that want to handle various events
asynchronously should use it. You can access the
central `cppcms::service`'s `io_service` by accessing
`cppcms::service::get_io_service()` member function.
In our simple case we would create our own event loop:
int main()
{
try {
booster::aio::io_service srv;
srv.run();
}
catch(std::exception const &e) {
std::cerr<<e.what()<<std::endl;
}
}
Note: `run()` will not exit until someone calls `srv.stop()`.
## First Use
So let's create our first very simple asynchronous
object that would allow us to stop the service by pressing
"Enter" on the keyboard.
So let's create an object that can use `io_service`
booster::aio::basic_io_device stdin_device(srv);
Then we attach the standard input file descriptor to it:
stdin_device.attach(0);
Note: `attach(int fd)` function only tells the `stdin_device` to use the file descriptor but not
to own it. i.e. It would not close it on the destruction.
Then we define a handler that should be called when
the device become readable:
stdin_device.on_readable(
std::bind(&booster::aio::io_service::stop,&srv)
);
Note: C++11 `std::bind` creates a callback object that
calls `stop` member function of the `srv` object. If you don't have `C++11` compiler you can use `boost::bind`
or `std::tr1::bind`.
Now lets print the notice:
std::cout << "Press any key to stop" << std::endl;
And we got our first "stoppable" service:
try {
booster::aio::io_service srv;
booster::aio::basic_io_device stdin_device(srv);
stdin_device.attach(0);
std::cout << "Press any key to stop" << std::endl;
stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv));
srv.run();
}
catch(std::exception const &e) {
std::cerr<<e.what()<<std::endl;
}
When the user presses "Enter" it the 0 file descriptor
becomes readable and `srv.stop()` will be called
to stop our program.
**Note:** this sample is valid only on POSIX OS where
all file descriptors are `select`able. It would
not work on Windows or Cygwin.
This is the method we can use for integrating any
application that supports "polling" to `booster::aio`
event loop.
## Echo Server
### Classes
We would need to use two classes:
1. The object responsible for accepting
new connections - `echo_acceptor`
2. The object responsible for handing specific TCP/IP
echo session: `echo_session`
### Echo Session Scratch
Let's show a scratch for echo session:
class echo_session :
public booster::enable_shared_from_this<echo_session> {
public:
echo_session(booster::aio::io_service &s) :
socket_(s)
{
}
void run();
...
private:
...
friend class echo_acceptor;
booster::aio::stream_socket socket_;
};
It has a constructor that receives `io_service` as
a parameter. It has a `socket_` member - the
class that is capable to handle general `STREAM_SOCK`
operations.
It also has a `run()` member function that
tells it to start working.
Note. use use `enable_shared_from_this` support and
we will see how we use it later.
### Acceptor
Now back to the acceptor.
class echo_acceptor {
public:
...
private:
booster::aio::acceptor acceptor_;
booster::shared_ptr<echo_session> new_session_;
};
It has two members:
1. The acceptor object - the socket that is used to accept
the connections
2. The pointer to newly created `echo_session` object
that would be responsible for starting the connection.
The constructor of the object would prepare the
socket for accepting new connections:
echo_acceptor(booster::aio::io_service &srv) : acceptor_(srv)
{
booster::aio::endpoint ep("0.0.0.0",8080);
acceptor_.open(ep.family());
acceptor_.set_option(booster::aio::acceptor::reuse_address,true);
acceptor_.bind(ep);
acceptor_.listen(10);
}
1. First we create a `endpoint` object - IP and port we
want to bind to.
2. We open it using `end_point`'s family - i.e. `AF_INET`
`AF_INET6` or `AF_UNIX`. Such code would allow as to write
the same code for IPv5, IPv6 and Unix domain sockets.
For example changing
booster::aio::endpoint ep("0.0.0.0",8080);
To
booster::aio::endpoint ep("/tmp/mysock");
Would switch entire code to use Unix domain sockets.
3. We setup the reuse option, bind and start listening
on the port.
Now let's create our main asynchronous function
void run()
{
new_session_.reset(new echo_session(acceptor_.get_io_service()));
acceptor_.async_accept(new_session_->socket_,
std::bind(&echo_acceptor::on_accepted,this,std::placeholders::_1));
}
First we create a new `session_object` using `acceptor_`'s
`io_service` and start asynchronous accepting.
We provide two parameters:
1. The socket we accept the connection into `new_session_->socket_`
Note the `socket_` object must remain valid till
the operation completes.
2. The callback that should be executed upon event completion. We use `std::bind` to connect the `on_accepted`
member function.
void on_accepted(booster::system::error_code const &e)
{
if(e) {
std::cout << e.message() << std::endl;
run();
}
else {
new_session_->run();
run();
}
}
This function receives a single parameter - the `error_code`
Each completion handler requires a error code as first
parameter. We ignored it in the case of `stdin_device`
but in general would should always use it.
If error occurred `if(w)` we report the error
and restart the asynchronous operation once again.
Alternatively we could call
acceptor_.get_io_service().stop();
And abort the program execution.
In case of success, the `socket_` now holds a valid
opened TCP/IP connection and we call the
`new_session_->run();` to let the object to deal with it.
Now the session object itself would be responsive on his
own life time, we can safely restart the service
and create new `echo_session` object for new incoming
connection.
### Echo Session
Now let's go back to our `echo_session` object:
First we add a buffer that would hold a temporary
data received from the client:
private:
char buffer_[1024];
Now let's define our `run()` member function:
void run()
{
socket_.async_read_some(
booster::aio::buffer(buffer_,sizeof(buffer_)),
std::bind(&echo_session::on_read,shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}
1. We tell the socket to read at most `sizeof buffer_` bytes to the `buffer_` area.
`buffer_` should remain valid until operation completes
2. We provide a callback `on_read` that would keep a reference pointer to "this" by using `shared_from_this()`
function and receive two parameters: error code and the
size of the data that was transfered.
Note: using `shared_from_this()` allows us to keep the
object alive as long as some callbacks exist.
If there is no more callbacks to execute, the reference
counter would go to 0, the object will be destroyed
and the socket would be closed.
The `on_read` member function would look like this:
void on_read(booster::system::error_code const &e,size_t tr)
{
if(e) return;
socket_.async_write(booster::aio::buffer(buffer_,tr),
std::bind(&echo_session::on_written,shared_from_this(),std::placeholders::_1));
}
If a error occurs (EOF is error as well) we just
exit and stop execution. The session object would
be destroyed. Otherwise we call `socket_.async_write`.
Unlike `async_read_some` or `async_write_some` it ensures
that _all_ the data is transfered which makes the
programming easier. So in the same way we send
all the data out and call `on_written` on the
completion.
Because we use `async_write` and not `async_write_some`
we do not care about the number of bytes transfered
as it should be as the size of the data we received - `tr`.
And finally when we receive write completion notification
we restart reading once again in same manner
void on_written(booster::system::error_code const &e)
{
if(e) return;
run();
}
### Event Loop Integration
In order to setup echo server we add to our main function
the following code:
echo_acceptor acc(srv);
acc.run();
Before we call `srv.run()`
So the final main function looks like:
try {
booster::aio::io_service srv;
booster::aio::basic_io_device stdin_device(srv);
stdin_device.attach(0);
std::cout << "Press any key to stop" << std::endl;
stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv));
echo_acceptor acc(srv);
acc.run();
srv.run();
}
catch(std::exception const &e) {
std::cerr<<e.what()<<std::endl;
}
## Booster.Aio vs Boost.Asio
If you are familiar with Boost.Asio, you will
find that `booster::aio` is very similar.
Indeed it was strongly influenced by Boost.Asio
but it has several key differences:
1. It has less features the Boost.Asio and it is much simpler.
2. It uses OOP rather then template meta-programming.
For example making the code above to work with
Unix domain sockets with Boost.Asio you had to
make everything template based.
3. It always uses Reactor under the hood which allows
you to attach as socket without actually passing the
ownership to the object which makes it much
easier integration with 3rd part libraries.
On the other hand it does not use IOCP under Windows.
4. It does not allow to call `io_service::run()` from
several threads simultaneously.
5. `io_service::run()` runs till `stop()` is called and
not until there is no more "work" to do.
6. It is much more fork friendly.
There are more differences but if you are familiar
with Boost.Asio it would be easy for you to use
<!--toc-->
## Introduction
CppCMS is build around a central asynchronous event loop. All incoming connections are handled, there, all asynchronous
applications use the event loop.
Even though we have seen how to write a simple asynchronous
applications, it is a good idea to get familiar with
asynchronous programming and with [booster::aio](/cppcms_ref_v0_99/namespacebooster_1_1aio.html)
namespace concepts and classes.
We will show a simple example of using the event loop
by creating a simple TCP echo server.
## The Event Loop
The central concept is the `booster::aio::io_service` class
that handles all I/O events.
All classes that want to handle various events
asynchronously should use it. You can access the
central `cppcms::service`'s `io_service` by accessing
`cppcms::service::get_io_service()` member function.
In our simple case we would create our own event loop:
int main()
{
try {
booster::aio::io_service srv;
srv.run();
}
catch(std::exception const &e) {
std::cerr<<e.what()<<std::endl;
}
}
Note: `run()` will not exit until someone calls `srv.stop()`.
## First Use
So let's create our first very simple asynchronous
object that would allow us to stop the service by pressing
"Enter" on the keyboard.
So let's create an object that can use `io_service`
booster::aio::basic_io_device stdin_device(srv);
Then we attach the standard input file descriptor to it:
stdin_device.attach(0);
Note: `attach(int fd)` function only tells the `stdin_device` to use the file descriptor but not
to own it. i.e. It would not close it on the destruction.
Then we define a handler that should be called when
the device become readable:
stdin_device.on_readable(
std::bind(&booster::aio::io_service::stop,&srv)
);
Note: C++11 `std::bind` creates a callback object that
calls `stop` member function of the `srv` object. If you don't have `C++11` compiler you can use `boost::bind`
or `std::tr1::bind`.
Now lets print the notice:
std::cout << "Press any key to stop" << std::endl;
And we got our first "stoppable" service:
try {
booster::aio::io_service srv;
booster::aio::basic_io_device stdin_device(srv);
stdin_device.attach(0);
std::cout << "Press any key to stop" << std::endl;
stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv));
srv.run();
}
catch(std::exception const &e) {
std::cerr<<e.what()<<std::endl;
}
When the user presses "Enter" it the 0 file descriptor
becomes readable and `srv.stop()` will be called
to stop our program.
**Note:** this sample is valid only on POSIX OS where
all file descriptors are `select`able. It would
not work on Windows or Cygwin.
This is the method we can use for integrating any
application that supports "polling" to `booster::aio`
event loop.
## Echo Server
### Classes
We would need to use two classes:
1. The object responsible for accepting
new connections - `echo_acceptor`
2. The object responsible for handing specific TCP/IP
echo session: `echo_session`
### Echo Session Scratch
Let's show a scratch for echo session:
class echo_session :
public booster::enable_shared_from_this<echo_session> {
public:
echo_session(booster::aio::io_service &s) :
socket_(s)
{
}
void run();
...
private:
...
friend class echo_acceptor;
booster::aio::stream_socket socket_;
};
It has a constructor that receives `io_service` as
a parameter. It has a `socket_` member - the
class that is capable to handle general `STREAM_SOCK`
operations.
It also has a `run()` member function that
tells it to start working.
Note. use use `enable_shared_from_this` support and
we will see how we use it later.
### Acceptor
Now back to the acceptor.
class echo_acceptor {
public:
...
private:
booster::aio::acceptor acceptor_;
booster::shared_ptr<echo_session> new_session_;
};
It has two members:
1. The acceptor object - the socket that is used to accept
the connections
2. The pointer to newly created `echo_session` object
that would be responsible for starting the connection.
The constructor of the object would prepare the
socket for accepting new connections:
echo_acceptor(booster::aio::io_service &srv) : acceptor_(srv)
{
booster::aio::endpoint ep("0.0.0.0",8080);
acceptor_.open(ep.family());
acceptor_.set_option(booster::aio::acceptor::reuse_address,true);
acceptor_.bind(ep);
acceptor_.listen(10);
}
1. First we create a `endpoint` object - IP and port we
want to bind to.
2. We open it using `end_point`'s family - i.e. `AF_INET`
`AF_INET6` or `AF_UNIX`. Such code would allow as to write
the same code for IPv5, IPv6 and Unix domain sockets.
For example changing
booster::aio::endpoint ep("0.0.0.0",8080);
To
booster::aio::endpoint ep("/tmp/mysock");
Would switch entire code to use Unix domain sockets.
3. We setup the reuse option, bind and start listening
on the port.
Now let's create our main asynchronous function
void run()
{
new_session_.reset(new echo_session(acceptor_.get_io_service()));
acceptor_.async_accept(new_session_->socket_,
std::bind(&echo_acceptor::on_accepted,this,std::placeholders::_1));
}
First we create a new `session_object` using `acceptor_`'s
`io_service` and start asynchronous accepting.
We provide two parameters:
1. The socket we accept the connection into `new_session_->socket_`
Note the `socket_` object must remain valid till
the operation completes.
2. The callback that should be executed upon event completion. We use `std::bind` to connect the `on_accepted`
member function.
void on_accepted(booster::system::error_code const &e)
{
if(e) {
std::cout << e.message() << std::endl;
run();
}
else {
new_session_->run();
run();
}
}
This function receives a single parameter - the `error_code`
Each completion handler requires a error code as first
parameter. We ignored it in the case of `stdin_device`
but in general would should always use it.
If error occurred `if(w)` we report the error
and restart the asynchronous operation once again.
Alternatively we could call
acceptor_.get_io_service().stop();
And abort the program execution.
In case of success, the `socket_` now holds a valid
opened TCP/IP connection and we call the
`new_session_->run();` to let the object to deal with it.
Now the session object itself would be responsive on his
own life time, we can safely restart the service
and create new `echo_session` object for new incoming
connection.
### Echo Session
Now let's go back to our `echo_session` object:
First we add a buffer that would hold a temporary
data received from the client:
private:
char buffer_[1024];
Now let's define our `run()` member function:
void run()
{
socket_.async_read_some(
booster::aio::buffer(buffer_,sizeof(buffer_)),
std::bind(&echo_session::on_read,shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}
1. We tell the socket to read at most `sizeof buffer_` bytes to the `buffer_` area.
`buffer_` should remain valid until operation completes
2. We provide a callback `on_read` that would keep a reference pointer to "this" by using `shared_from_this()`
function and receive two parameters: error code and the
size of the data that was transfered.
Note: using `shared_from_this()` allows us to keep the
object alive as long as some callbacks exist.
If there is no more callbacks to execute, the reference
counter would go to 0, the object will be destroyed
and the socket would be closed.
The `on_read` member function would look like this:
void on_read(booster::system::error_code const &e,size_t tr)
{
if(e) return;
socket_.async_write(booster::aio::buffer(buffer_,tr),
std::bind(&echo_session::on_written,shared_from_this(),std::placeholders::_1));
}
If a error occurs (EOF is error as well) we just
exit and stop execution. The session object would
be destroyed. Otherwise we call `socket_.async_write`.
Unlike `async_read_some` or `async_write_some` it ensures
that _all_ the data is transfered which makes the
programming easier. So in the same way we send
all the data out and call `on_written` on the
completion.
Because we use `async_write` and not `async_write_some`
we do not care about the number of bytes transfered
as it should be as the size of the data we received - `tr`.
And finally when we receive write completion notification
we restart reading once again in same manner
void on_written(booster::system::error_code const &e)
{
if(e) return;
run();
}
### Event Loop Integration
In order to setup echo server we add to our main function
the following code:
echo_acceptor acc(srv);
acc.run();
Before we call `srv.run()`
So the final main function looks like:
try {
booster::aio::io_service srv;
booster::aio::basic_io_device stdin_device(srv);
stdin_device.attach(0);
std::cout << "Press any key to stop" << std::endl;
stdin_device.on_readable(std::bind(&booster::aio::io_service::stop,&srv));
echo_acceptor acc(srv);
acc.run();
srv.run();
}
catch(std::exception const &e) {
std::cerr<<e.what()<<std::endl;
}
## Booster.Aio vs Boost.Asio
If you are familiar with Boost.Asio, you will
find that `booster::aio` is very similar.
Indeed it was strongly influenced by Boost.Asio
but it has several key differences:
1. It has less features the Boost.Asio and it is much simpler.
2. It uses OOP rather then template meta-programming.
For example making the code above to work with
Unix domain sockets with Boost.Asio you had to
make everything template based.
3. It always uses Reactor under the hood which allows
you to attach as socket without actually passing the
ownership to the object which makes it much
easier integration with 3rd part libraries.
On the other hand it does not use IOCP under Windows.
4. It does not allow to call `io_service::run()` from
several threads simultaneously.
5. `io_service::run()` runs till `stop()` is called and
not until there is no more "work" to do.
6. It is much more fork friendly.
There are more differences but if you are familiar
with Boost.Asio it would be easy for you to use
`booster::aio`.

Sidebar:

## Related
- [booster::aio reference manual](/cppcms_ref_v0_99/namespacebooster_1_1aio.html)
## Related
- [booster::aio reference manual](/cppcms_ref_v0_99/namespacebooster_1_1aio.html)
← [Advanced Caching](http://cppcms.com/wikipp/en/page/cppcms_1x_adv_caching)
[Secure Programming with CppCMS](http://cppcms.com/wikipp/en/page/secure_programming)→

About

CppCMS is a web development framework for performance demanding applications.

Support This Project

SourceForge.net Logo

Поддержать проект

CppCMS needs You


Navigation

Main Page


Valid CSS | Valid XHTML 1.0