asio服务器模式:异步调用

DinS          Written on 2017/10/30

本文介绍如何使用asio搭建异步模式的服务器,会有一定难度,确保阅读过《asio基础使用方法》并完全理解了其内容。

Asio本身提供了一种异步模式来做服务器。因为是异步的所以理解和使用起来都有一定难度,但是一旦掌握会发挥强大效果。为了说明,我们必须从最简单的例子开始试验,逐步理解Asio异步模式。

注:c++20标准后asio提供了协程的方式,可以用同步的代码形式实现异步逻辑。现在推荐使用协程而不是异步方式构建服务器。文章可见这里

一、回调函数

回调函数是异步模式的关键。所谓回调函数就是异步任务完成后自动执行的函数。
下面看一段试验代码来理解回调函数。

大图点击这里

为了便于说明,在关键代码部分插入了输出。
首先我们定义了一个函数叫TestHandler,用于回调。
准备socket都是一样的,略过。
关键性区别在于AcceptClient调用了async_accept而不是阻塞的accept,注意到第二个参数是刚刚定义的回调函数。
然后我们执行myIOService.run(),其作用注释中已说明。

整理一下思路:使用了异步连接,如果稍后有客户端连接进来,TestHandler会被自动调用,于是应该输出一段话,让我们看看效果。

这是打开服务端后显示界面。

打开客户端后显示界面。

也就是说程序没有在async_accept处阻塞,在myIOService.run处阻塞。
但是此处阻塞不影响async_accept接收客户端连接,客户端连接后TestHandler确实被调用了,我们看到输出了一句话。
这个回调函数结束后myIOService阻塞结束,主线程结束。

二、使用bind增加参数

回调函数的返回值和参数必须如上,但是实际应用中肯定不够,还有许多参数需要传递给回调函数,怎么破?
使用bind。Bind这个东西定义在头文件<functional>中,可能你比较生疏,一般也不会用到它,在这里可以理解为把参数打包,但是调用形式仍然不变。
下面看更改的代码:

只给出了更改的部分。bind的第一个参数指明函数,后面依次是传入参数,这里传入了一个8。
看看结果:

数字8确实传入了回调函数,这证明我们成功了。

三、使用shared_ptr和连锁回调实现异步accept

掌握了上面两个基本概念和用法,来看看实际问题:如何使用异步方法实现accept。
显然这里有两个问题:一是如何让异步任务不断继续下去,二是如何传递已连接socket。
对于第一个问题,可以在回调函数最后再次调用自身,这样形成一个连锁,就可以继续下去。
对于第二个问题,socket类无法直接移动到回调函数中,需要使用到shared_ptr。
为什么是shared_ptr?看了下面的代码后回答。

大图点击这里

代码并不长,而且没有while,那么是如何做到不断accept的呢?虽然不长,但是能说的东西很多,依次道来。
首先注意到的是把io_service和acceptor声明成全局变量了,这是为了方便回调函数使用。这里如此处理是为了方便理解,真正的服务端会使用类来包装。

之后先看main,这里没有直接声明一个socket对象,而是使用了shared_ptr动态分配内存指向了一个socket类,用于存放连接socket。异步accept的回调函数需要传入连接socket,这样才能进行后续操作。但是为什么不直接传入socket对象而是使用指针?
本人亲自试过传入socket对象,报错缺少boost,然后使用了std::move,依然报错。这样看来只能使用指针了,估计是Asio底层实现的考虑不让复制socket对象,由此引发一个问题:既然传入指针那么new\delete即可,为什么用智能指针?
无脑的回答是应该使用智能指针代替new\delete,这适用于所有情况。但是这里还有更加重要的原因要使用智能指针,如果不使用则程序无法正确运行。

为了理解这句话,再来看回调函数的代码。
回调函数里输出了传入socket的地址,没什么太多可说的。重要的是下面的部分:再次使用智能指针申请了一个socket,然后再次调用异步accept,回调函数是回调函数本身,传入socket是新申请的socket。
虽然只有3行,但是包涵的信息量非常大。
咱们可以脑内模拟一下程序的运行状况,当有客户端连入后,回调函数被调用,输出客户端信息,然后再次异步accept,绑定了同样的回调函数,返回。
于是一旦下一个客户端连接,同样的故事再次上演。这样子相当于连锁回调,回调函数内部有异步accept,达到了不断监听的作用。

那么为什么用智能指针呢?
因为异步任务不会阻塞,直接返回。如果使用普通的指针,则异步accept后函数返回,则声明的socket就清理了。
一旦后面有客户端连进来,再找刚才声明的socket,内容已经空了,当然就会出问题。这种情况下只能使用智能指针,即使函数返回了,但是异步accept绑定的回调函数依然指向socket,所以不会清理。

用一张图来表示逻辑可能更直观。

四、异步收(发)信息

仅仅做到不断异步accept还不够,我们还需要异步收发信息。
但是这里需要明确一点,异步收信息绝对有必要,因为这会阻塞进程。但是异步发信息的意义不大,一般来说调用send不会失败,而且如果真是异步发信息,则待发送的信息也需要用shared_ptr,这个无疑增加了代码阅读难度。
另外async_write这个异步发送内部实现的某些原因,连续调用异步发送可能导致发送的信息纠缠在一起,具体可见http://www.cnblogs.com/qicosmos/p/3487169.html
综合起来,推荐异步accept和异步recv和同步send。

但是此时会碰到一个问题:shared_ptr不支持动态数组,也就是说recv用的char[]无法处理。因此我们需要对整个代码的逻辑进行改造才可以。
有了上面的基础,理解下面的代码应该稍微容易一些。为了便于说明,并非按照代码顺序而是按照理解顺序给出代码片段。最后会给出所有代码。

这部分是头文件,没什么太多好说的。

先看main。逻辑上很简单,建立了一个io_service对象,构造了一个服务器,开始异步任务。外面套一个try-catch。
于是核心问题就是服务器Server类是如何工作的。

大图点击这里

Server类并不复杂,其成员是一个acceptor用于监听客户端连接,一个socket用于存放已连接socket。构造函数直接初始化列表把成员初始化,然后调用DoAccept()。
于是核心问题就变为DoAccept是干什么的。DoAccept看起来很复杂,其实就一行代码,即async_accept()。之所以复杂是因为这里的回调函数使用了lambda直接构造。
再看看lamba里干什么了。捕获列表是自身this,参数是error_code以满足回调函数统一规定,实质性代码是两句话:首先造了一个智能指针指向Session类,其次再次调用DoAccept()。
第二个容易理解,连锁回调之前已经遇到过了。第一个是动态申请,然后调用Session类的start成员函数。于是问题又变成Session类是干什么的。
Session类是用于处理socket收发信息的类,从这点讲上面的写法跟开辟线程的思路有点类似,但是动态申请毕竟不是多线程,所以可以避免多线程带来的麻烦。
接下来看看Session类是如何工作的。

大图点击这里

这个类比Server类要复杂,但是也可以理解。
有两个成员,一个是socket,用于存放已连接socket,另一个是char[],存放数据。构造函数就是直接把成员socket初始化为已连接socket。这里特意加了一个析构函数,是为了说明接下来要遇到的问题。
还有两个私有函数,一个是RecvData(),用于接收数据,看起来也很复杂但实际上也是一句话:async_receive,只不过回调函数是lambda。奇怪的是这里的lambda参数中除了error_code外还必须有一个int,否则报错,原因不明,照着写即可。
回调函数是WriteData(),这个函数没什么特别的,是同步send。之前说过异步send容易出问题。
于是Session的逻辑就是接收到已连接socket,并异步接收信息,一旦接收到信息就同步发送信息。
如果为了循环收发可以在RecvData()里再次手动调用RecvData形成连锁调用,当然具体的收发顺序取决于通信约定。
可以看到两个私有函数里都插入了输出,这是为了说明接下来要遇到的问题。

那么问题是什么呢?先来运行一下这个程序试试看:

启动服务端后什么都不显示,正常。

打开一个客户端后,服务端这样显示,很明显不太对,显示了Session done表明这个Session已经析构了。

如果此时在客户端输入信息,会显示错误,不奇怪因为服务端的已连接socket析构了。

再打开一个客户端,同样的故事,好消息是异步accept是成功了。

问题出在哪里?问题出在shared_ptr上。
在DoAccept里面我们用make_shared造了一个智能指针,然后调用了Session对象的start。根据输出,start是调用成功了,并且成功执行了RecvData(),并且RecvData()也执行完毕了。
但是记住我们是在DoAccept的回调函数里申请shred_ptr的,客户端连接后调用回调函数,然后再次调用DoAccept后这次的DoAccept就返回了。同时在Session里的RecvData里也没有增加shared_ptr的引用计数,因为并没有直接用到shared_ptr。于是在DoAccept返回后申请的shared_ptr引用计数为0,自动释放。
于是我们就看到了Session done,即析构函数被调用。但是我们希望的是这个Session对象在通信结束前一直存在。换句话说,我们要在RecvData里给shared_ptr的引用计数增加,这样就不会自动释放。

为了达到这个目的,需要用到c++中的高级特性。
99%的读者应该都没有用过这个特性,实际上我也是在接触asio后才知道有这么一个东西的存在,这个东西叫shared_from_this()。
直接看更改后的代码,结合讲解。

点这里看大图

为了在类中使用shared_from_this,我们需要从基类中派生出自己的类,这就是public enable_shared_from_this<Session>的用意。
之后我们在RecvData中先声明一个self,这个就是指向类自己的shared_ptr。为什么不能直接使用shared_ptr<T>(this)?因为这会创建出一个所有权独立的shared_ptr,达不到我们的目的,原来的shared_ptr该析构还会析构。

有了这个self,再把它放到lambda即回调函数的捕获列表中,这样原来的shared_ptr引用计数就会+1,即使RecvData返回了也不会析构掉。于是当客户端发送信息过来后可以找到对应的已连接socket,通信就正常了。
赶快看看效果吧。

当客户端连接后,RecvData调用,析构函数没有调用,有戏。

客户端输入信息后,服务端反映正常。只不过因为没有初始化m_data,所以出现了乱码。

五、异步模式服务器代码(全)

下面修补一下bug,把计数功能再完善一下,全部代码如下。

(大图点这里)

运行结果如下:

终于成功了!

小结一下asio的异步模式。
这个东西有些难使用,尤其是第一次接触时难以理解,还有lambda和shared_from_this,不过这些东西以套路居多。
一旦掌握asio异步模式后确实能够发挥巨大的效果,既避免了多线程的潜在风险,又没有非阻塞的while(1)不断轮询占用系统资源,而且能够处理的socket理论上是没有限制的。

六、总结

Asio提供了使用方便、功能强大的网络通信对象,搭建起socket只需要几行代码,而且错误处理机制也相当简便。阻塞+多线程模式和非阻塞+轮询模式在Asio下也非常简单。Asio还提供了自己独有的异步模式,虽然一开始有些难以使用,但是掌握后能发挥巨大作用。

一个小问题是为什么没有select、epoll等多路复用模型?
这个已经被Asio做到底层了,会根据操作系统调用效率最高的函数,不需要程序员担心,实际上availble()函数本身应该就是用这个实现的(个人猜测)。