Asio小结

Proactor模式

proactor模式组件关系图

事件驱动应用程序使用Proactor模式,能够高效的分离和分派由异步操作完成而触发的服务请求,从而发挥并发的性能优势并且不会产生某些副作用。

在分布式系统中,基于事件驱动的应用程序,尤其是服务器应用程序,通常可以通过异步处理多个服务请求来提升性能。异步服务完成时,应用程序必须马上处理由操作系统传递过来的相应的完成事件,表明异步计算已结束。要想有效地支持上述异步模型,需要解决下列四个问题:

  • 为了提高可伸缩性,减少等待时间,应用程序必须能同时处理多个完成事件,而且不允许持续时间较长的操作过度延迟其他操作的处理。
  • 为了使处理能力最大化,需要避免任何不必要的上下文切换、同步以及CPU之间的数据搬移。
  • 集成新服务或改善已有服务,对现有的完成事件分离及分派机制只需进行最小化的改动。
  • 对应用程序代码最大限度地屏蔽多线程和同步机制的复杂性。

我们将应用程序分为两个部分:异步执行、持续时间较长的操作和完成处理程序,当操作结束时,由完成操作处理程序处理这些操作的结果。接操作完成事件的分离与分派集成在一起,异步操作结束时,操作完成事件传递并分派给操作完成事件处理程序进行处理。

在Proactor中会进行如下协作:

proactor模式组件协作时序图

  • 作为发起者的应用程序组件在特定句柄上调用异步操作处理程序的异步操作。除了传递数据给异步操作外,发起者还将特定的操作完成处理参数,如操作完成的处理程序或句柄,传递给操作完成事件队列。异步操作处理程序在内部保存上述参数,以备稍后使用。
  • 发起者调用了异步操作处理程序的操作后,发起者和调用者的操作可以独立运行。当其他操作并发执行时,发起者甚至可以调用新的异步操作。如果异步操作需要从远端应用程序接受服务请求,异步操作处理程序会延迟该操作,直到收到服务请求。当与期望的服务请求相关的事件发生时,异步操作就会结束执行。
  • 当异步操作结束执行时,异步操作处理程序产生操作完成事件,改事件中包含了异步操作完成的结果。最初在句柄上调用了异步操作,然后异步操作处理程序将该事件插入到该剧并相关的操作事件完成队列中。
  • 当准别好处理由它的异步操作引发的操作完成事件时,应用程序调用前摄器事件处理循环的入口方法,我们称之为handle_events()。该方法调用异步事件分离程序等待异步操作处理程序将操作完成事件插入到队列中。在操作完成事件从队列中移除后,前摄器的handle_events()将其分离到对应的操作处理完成程序中,然后再分派操作完成处理程序中合适的钩子方法,将异步操作的结果传递给钩子方法。
  • 具体操作完成处理程序之后处理接收到的操作完成结果。如果操作完成吹程序向调用者返回结果,则可能存在两种可能的情况。第一种情况,处理异步操作的结果的完成程序就是之前异步操作的发起者。在这种情况下,操作完成处理程序自己就是调用者,所以它不需要作额外的工作来返回操作结果。第二种情况,远端的应用程序或应用程序内部的组件可能参与了该异步操作。在这种情况下操作完成处理程序需要在其传输句柄上调用异步读写操作,向远端的应用程序返回操作结果。

结束处理后,操作完成程序可以调用其他的异步操作,此时整个周期将重新开始循环。

Michael Caisse在cppcon2016上用一个有趣的故事来详细说明了Proactor模式中各个组件的职责以及它们只之间是如何交互的。

asio基本用法

asio中的异步操作过程如下:

async_op1
  1. 应用程序调用IO对象对连接操作进行初始化(如设置socket连接成功时的回调函数):socket.async_connect(server_endpoint, your_completion_handler);your_completion_handler函数的签名为:void your_completion_handler(const boost::system::error_code& ec);
  2. IO对象将请求转发给io_service
  3. io_service通知操作系统其需要开始一个异步连接.
async_op2
  1. 操作系统指示连接操作完成, io_service从队列中获取操作结果.
  2. 应用程序必须调用io_service::run()(或io_service相似的成员函数)以便于接收结果.调用io_service::run()会阻塞未完成的异步操作,因此可在启动第一个异步操作后调用这个函数.
  3. 调用io_service::run()后,io_service返回一个操作结果,并将其翻译为error_code,传递到完成事件处理器中.

在进行通信时,asio使用buffer来管理读写内存,包括const_buffermutabel_buffer,它们相当于某块内存的非拥有指针,定义如下

1
2
using const_buffer=std::tuple<const void*, size_t>;
using mutable_buffer=std::tuple<void*, size_t>;

我们可以通过asio::buffer(...)函数来构造所需的buffer,示例:

1
2
3
4
5
std::array<uint_8,4> head={0xba, 0xbe, 0xfa, 0xce};
std::string msg{"CppCon "}; 
std::vector<uint_8> data(256);
std::vector<asio::const_buffer> bufs{asio::buffer(head),asio::buffer(msg),asio::buffer(data)};
socket_.send(bufs);

某些事件有隔离性的要求,例如,我们不希望在同一时刻有多个线程对同一个fd进行写操作使用asio::strand对象可以保证事务的隔离性,它可以使我们在不做额外的同步操作的情况下保证某个completion handler从开始到完成的过程中不会由于调度被其他其他线程打断。例如下面的代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
void timer_expired(std::string id){
	std::cout << now_time << " " << id << " enter.\n";
	std::this_thread::sleep_for(std::chrono::seconds(3));
	std::cout << now_time << " " << id << " leave.\n";
}
int main(){
	asio::io_service service;
	asio::io_service::strand strand(service);
	asio::deadline_timer timer1(service, posix_time::seconds(5));
	asio::deadline_timer timer2(service, posix_time::seconds(5));
	timer1.async_wait(
		strand.wrap( [](auto ... vn){ timer_expired("timer1"); } )
	);
	timer2.async_wait(
		strand.wrap( [](auto ... vn){ timer_expired("timer2"); } )
	);
	std::thread ta( [&](){service.run();} );
	std::thread tb( [&](){service.run();} );
	ta.join(); tb.join();
	std::cout << "done.\n";
}

将会输出

1
2
3
4
5
Tue Sep 20 23:43:49 2016 timer1 enter.
Tue Sep 20 23:43:52 2016 timer1 leave.
Tue Sep 20 23:43:52 2016 timer2 enter.
Tue Sep 20 23:43:55 2016 timer2 leave.
done.

如果不使用asio::strand对象进行包裹,则不会有这种保证,其可能有如下输出

1
2
3
4
5
6
7
8
9
TTuuee SSeepp 2200 2233::2211::2233 22001166

ttiimmeerr21 eenntteerr..

TTuuee SSeepp 2200 2233::2211::2266 22001166

ttiimmeerr12 lleeaavvee..

done.

用在某个服务端的发送函数中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void start_packet_send(){
	send_packet_queue.front() += "\0";
	auto self=shared_from_this()
	async_write( socket_, asio::buffer(send_packet_queue.front())
					, write_strand_.wrap( 
						[self]( system::error_code const & ec, std::size_t){
							sefl->packet_send_done(ec);
						}
					)
				);
}

async_connect()用于设置连接成功时的回调函数,其用法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
using namespace boost;

asio::io_service io_service;
asio::ip::tcp::socket socket(io_service);
auto address=asio::ip::tcp::endpoint(asio::ip::address::from_string("127.0.0.1"),port);
asio::async_connect(socket,address,connec_handler);
io_service.run();

void connect_handler(const boost::system::error_code& ec){
    if(ec){
        std::cout<<ec.message()<<std::endl;
        //handle error
        return ;
    }
    //do something
}

async_read()用于发起异步读并设置读事件完成时调用的回调函数,它需要一个buffer作为读数据的缓冲区。

1
2
3
4
5
asio::io_service io_service;
asio::ip::tcp::socket socket(io_service);
std::string str="hello, world";
asio::async_read(socket,address,asio::buffer(str.c_str(),str.length()+1),read_handler);
io_service.run();

asio异步发送时,注意不能连续调用异步发送端口asio::async_write(),因为asio::async_wrtie()内部不断调用asio::async_write_some(),直到所有数据发完为止。由于asio::async_write()在调用之后就返回了,如果第一次发送一个交大的包,第二次发送一个较小的包,可能会使得两次发送的数据交织在一起。解决方案是用一个发送缓冲区,在异步发送完成之后从缓冲区中再去下一个数据包发送。具体实现见下例。

用asio实现server/client

该通信程序能实现下列功能:服务端监听某个端口,允许多个客户端连接,服务端将客户端发送的数据打印出来。异步接收使用asio::acceptor::async_accept(),它接收一个socket和一个完成事件回调函数。注意socket不允许复制,不能能直接放入容器,需要包装一层。

读写事件处理器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
//RHhandler.hpp
#pragma once
#include <boost/asio.hpp>
#include <array>
#include <functional>
#include <algorithm>
#include <iostream>
#include "Message.hpp"
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
const size_t MAX_IP_PACK_SIZE = 10000;
const size_t HEAD_LEN = 4;
class RWHandler :public std::enable_shared_from_this<RWHandler> {
public:
	RWHandler(io_service& ios) :m_sock(ios) {}
	~RWHandler() {}
	void handleread() {
		auto self = shared_from_this();
        //接收包头并解析包体长度
		async_read(m_sock, buffer(m_readmsg.data(), HEAD_LEN), 
			[this, self](const boost::system::error_code& ec, size_t sz) {
			if (ec||!m_readmsg.decond_header()) {
				handleError(ec);
				return;
			}
			readBody();//接收包体
			}
		);
	}
	void readBody() {
		auto self = shared_from_this();
		async_read(m_sock, buffer(m_readmsg.body(), m_readmsg.body_length()),
			[this, self] (const boost::system::error_code& ec,size_t sz){
			if (ec) {
				handleError(ec);
				return;
			}
            //收到完成数据包,进行回调
			callback(m_readmsg.data(), m_readmsg.length());
			//发起下一次异步读
            handleread();
		}
		);
	}

	void handlewrite(char* data,size_t len) {
		boost::system::error_code ec;
		write(m_sock, buffer(data, len), ec);
		if (ec) {
			handleError(ec);
		}
	}
	void closeSocket() {
		boost::system::error_code ec;
		m_sock.shutdown(tcp::socket::shutdown_both, ec);
		m_sock.close(ec);
	}
	void setconnid(int id) {
		m_connid = id;
	}
	int getconnid() { return m_connid; }
	template<typename T>
	void setcallerror(T f) {
		m_callerror = f;
	}
	tcp::socket& getsocket() { return m_sock; }
	void callback(char* pdata, size_t l) {
		std::cout << pdata +HEAD_LEN<< std::endl;
	}


private:
	void handleError(const boost::system::error_code& ec) {
		closeSocket();
		std::cout << ec.message() << std::endl;
		if (m_callerror) {
			m_callerror(m_connid);
		}
	}
private:
	tcp::socket m_sock;
	std::array<char, MAX_IP_PACK_SIZE> m_buff;
	int m_connid;
	std::function<void(int)> m_callerror;
	Message m_readmsg;

};

在RWHandler中通过shared_from_this()返回this指针,保证异步操作时原对象的生命周期不会结束,在回调函数返回时还是有效的。为解决TCP粘包问题,使用Message收发数据,Message包含消息长度与消息内容两个字段,接收时,首先解析包头,再接收这个包头中长度的数据,确保收到一个完整的数据包。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//Message.hpp
#pragma once
#include <algorithm>
class Message {
public:
	enum { header_length = 4 };
	enum { max_body_lenght = 512 };

	Message() :body_length_(0) {}
	const char* data()const { return data_; }
	char* data() { return data_; }
	size_t length()const { return header_length + body_length_; }
	const char* body()const { return data_ + header_length; }
	char* body() { return data_ + header_length; }
	size_t body_length()const { return body_length_; }
	void body_length(size_t new_length) {
		body_length_ = std::min<size_t>(new_length, max_body_lenght);
	}
	bool decond_header() {
		char header[header_length + 1] = "";
		std::strncat(header, data_, header_length);
		body_length_ = std::atoi(header) - header_length;
		if (body_length_ > max_body_lenght) {
			body_length_ = 0;
			return false;
		}
		return true;
	}
	void encode_header() {
		char header[header_length + 1] = "";
		std::sprintf(header, "%4d", body_length_);
		std::memcpy(data_, header, header_length);
	}

private:
	char data_[header_length + max_body_lenght];
	size_t body_length_;
};
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//Server.hpp
#pragma once
#include "Message.hpp"
#include "RWHandler.hpp"
#include <boost/asio.hpp>
#include <unordered_map>
#include <list>
#include <algorithm>
#include <numeric>

const int MAXCONNCTION = 65536;
const int MAXRECVSIZE = 65536;
class Server {
public:
	Server(io_service& ios_,short port):ios(ios_),m_ac(ios,tcp::endpoint(tcp::v4(),port)),
		 pool(MAXCONNCTION){
		pool.resize(MAXCONNCTION);
		std::iota(pool.begin(), pool.end(), 1);
	}
	~Server() {}
	void accept() {
		std::cout << "Start listening..." << std::endl;
		std::shared_ptr<RWHandler> handler = createhandler();
		m_ac.async_accept(handler->getsocket(),
          [this, handler](const boost::system::error_code& ec) {
			if (ec) {
				std::cout << ec.value() << " " << ec.message() << std::endl;
				handleacperror(handler, ec);
				return;
			}
			m_handlers.insert(std::make_pair(handler->getconnid(), handler));
			std::cout << "current connect count :" << m_handlers.size() << std::endl;
			//std::cout << "the message is from " << handler->getconnid() <<" : "<< std::endl;
			handler->handleread();
			accept();
		});
	}

private:
	void handleacperror(std::shared_ptr<RWHandler> eventhandler, const boost::system::error_code& ec) {
		std::cout << "Error , error reason: " << ec.value() << ec.message() << std::endl;
		eventhandler->closeSocket();
		stopaccept();
	}
	void stopaccept() {
		boost::system::error_code ec;
		m_ac.cancel(ec);
		m_ac.close(ec);
		ios.stop();
	}
	std::shared_ptr<RWHandler> createhandler() {
		int id = pool.front();
		pool.pop_front();
		std::shared_ptr<RWHandler> p = std::make_shared<RWHandler>(ios);
		p->setconnid(id);
		p->setcallerror([this](int id) {
			recycleid(id);
		});
		return p;
	}
	void recycleid(int id){
		auto it = find(pool.begin(),pool.end(),id);
		if (it != pool.end())pool.erase(it);
		std::cout << "current connect count :" << m_handlers.size() << std::endl;
		pool.push_back(id);
	}

private:
	io_service& ios;
	tcp::acceptor m_ac;
	std::unordered_map<int, std::shared_ptr<RWHandler>> m_handlers;
	std::list<int> pool;
};

void testforserver() {
	io_service io;
	Server s(io, 9900);
	s.accept();
	io.run();
}

客户端除实现读写功能外,还能自动重连。下面用连接器实现连接与I/O事件处理,用专门的线程进行重连操作,在连接失败或读写发生错误后,会关闭连接并尝试重连。连接器连接成功后,发起了一个异步读操作,它的作用除了接收数据外,还可以用来判断连接是否断开,因为当连接断开时,异步接收事件会触发,据此可做重连操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//Connector.hpp
#pragma once
#include <boost/asio.hpp>
#include <algorithm>
#include <thread>
#include <string>
#include <chrono>
#include "RWHandler.hpp"
using namespace boost::asio;
using namespace boost::asio::ip;
using namespace boost;
#define _CRT_SECURE_NO_WARNINGS
class Connector {
public:
	Connector(io_service& ios_, const std::string& ip, short port) :ios(ios_),m_sock(ios_),
		m_serveraddr(tcp::endpoint(address::from_string(ip), port)), is_connected(false), m_ck(nullptr) {
		createhandler();
	}
	~Connector() {}
	bool start() {
		m_eventhandler->getsocket().async_connect(m_serveraddr, 
           [this](const boost::system::error_code& ec) {
			if (ec) {
				handleConnecterror(ec);
				return false;
			}
			std::cout << "connect ok" << std::endl;
			is_connected = true;
			m_eventhandler->handleread();
		});
		std::this_thread::sleep_for(std::chrono::seconds(2));
		return is_connected;
	}
	bool isConnected()const { return is_connected; }
	void send(char* data, size_t l) {
		if (!is_connected) {
			return;
		}
		m_eventhandler->handlewrite(data,l);
	}
private:
	void createhandler() {
		m_eventhandler = std::make_shared<RWHandler>(ios);
		m_eventhandler->setcallerror([this](int id) {handleRWerror(id); });
	}

	void checkConnect() {
		if (m_ck) {
			return;
		}
		m_ck = std::make_shared<std::thread>([this] {
			while (1) {
				if (!isConnected())start();
				std::this_thread::sleep_for(std::chrono::seconds(1));
			}
		});
	}
	void handleConnecterror(const boost::system::error_code& ec) {
		is_connected = false;
		std::cout << ec.message() << std::endl;
		m_eventhandler->closeSocket();
		checkConnect();
	}
	void handleRWerror(int id) {
		is_connected = false;
		checkConnect();
	}

private:
	io_service& ios;
	tcp::socket m_sock;
	tcp::endpoint m_serveraddr;
	std::shared_ptr<RWHandler> m_eventhandler;
	bool is_connected;
	std::shared_ptr<std::thread> m_ck;
};

void testforclient() {
	io_service io;
	boost::asio::io_service::work wk(io);
	std::thread t1([&io] {io.run(); });
	Connector c1(io, "127.0.0.1", 9900);
	c1.start();
	std::string str;
	if (!c1.isConnected()) {
		return;
	}

	const size_t l = 512;
	char line[l] = "";
	while (std::cin >> str) {
		char header[HEAD_LEN] = "";
		size_t total = str.size() +1 + HEAD_LEN;
		std::sprintf(header, "%4d", total);
		memcpy(line, header, HEAD_LEN);
		memcpy(line + HEAD_LEN, str.c_str(), str.size() + 1);
		c1.send(line, total);
	}
}