asio之socket的创建和连接

终端节点的创建

  • 所谓终端节点就是用来通信的端对端的节点,可以通过ip地址和端口构造,其的节点可以连接这个终端节点做通信。

  • 如果我们是客户端,我们可以通过对端的ip和端口构造一个endpoint,用这个endpoint和其通信。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    int  client_end_point() {
    // 步骤 1. 假设客户端应用程序已经获得了 IP 地址和协议端口号。
    std::string raw_ip_address = "127.0.0.1";
    unsigned short port_num = 3333;
    // 用于存储在解析原始 IP 地址时发生的错误信息。
    boost::system::error_code ec;
    // 步骤 2. 使用 IP 协议版本无关的地址表示。
    asio::ip::address ip_address =
    asio::ip::address::from_string(raw_ip_address, ec);
    if (ec.value() != 0) {
    // 提供的 IP 地址无效。中断执行。
    std::cout
    << "Failed to parse the IP address. Error code = "
    << ec.value() << ". Message: " << ec.message();
    return ec.value();
    }
    // 步骤 3.
    asio::ip::tcp::endpoint ep(ip_address, port_num);
    // 步骤 4. 端点已准备就绪,可以用来指定网络中客户端希望与之通信的特定服务器。
    return 0;
    }
  • 如果是服务端,则只需根据本地地址绑定就可以生成endpoint

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    int server_end_point(){
    // 步骤1. 这里我们假设服务器应用程序已经获取了协议端口号。
    unsigned short port_num = 3333;

    // 步骤2. 创建 asio::ip::address 类的特殊对象,
    // 该对象指定主机上可用的所有IP地址。注意
    // 这里我们假设服务器在IPv6协议上工作。
    asio::ip::address ip_address = asio::ip::address_v6::any();

    // 步骤3. 创建 asio::ip::tcp::endpoint 实例。
    asio::ip::tcp::endpoint ep(ip_address, port_num);

    // 步骤4. 端点已创建,可以用来
    // 指定服务器应用程序希望监听传入连接的
    // IP地址和端口号。
    return 0;
    }

创建socket

  • socket进行通信必须要一个参数,这个参数就是上下文
  • 上下文是boost的asio的一个核心服务,它的所有的服务都是通过上下文服务来通信的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int create_tcp_socket() {
// 步骤1. 需要 'io_service' 类的实例作为套接字构造函数的参数。
asio::io_context ios;

// 步骤2. 创建 'tcp' 类的对象,该对象表示使用 IPv4 作为底层协议的 TCP 协议。
asio::ip::tcp protocol = asio::ip::tcp::v4();

// 步骤3. 实例化一个活动的 TCP 套接字对象。
asio::ip::tcp::socket sock(ios);

// 用于存储打开套接字时发生的错误信息。
boost::system::error_code ec;

// 步骤4. 打开套接字。
sock.open(protocol, ec);
if (ec.value() != 0) {
// 打开套接字失败。
std::cout
<< "Failed to open the socket! Error code = "
<< ec.value() << ". Message: " << ec.message();
return ec.value();
}
return 0;
}

上述socket只是通信的socket,如果是服务端,我们还需要生成一个acceptor的socket,用来接收新的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int create_acceptor_socket() {
/*
// 步骤1:需要一个'io_service'类的实例作为socket构造函数的参数。
asio::io_context ios;
// 步骤2:创建一个表示TCP协议的'tcp'类对象,使用IPv6作为底层协议。
asio::ip::tcp protocol = asio::ip::tcp::v6();
// 步骤3:实例化一个acceptor socket对象。
asio::ip::tcp::acceptor acceptor(ios);
// 用于存储在打开acceptor socket时发生的错误信息。
boost::system::error_code ec;
// 步骤4:打开acceptor socket。
acceptor.open(protocol, ec);
if (ec.value() != 0) {
// 无法打开socket。
std::cout
<< "无法打开acceptor socket!"
<< "错误代码 = "
<< ec.value() << "。消息:" << ec.message();
return ec.value();
}
*/
// 新版本更简单的方法
asio::ip::tcp::acceptor a(ios,asio::ip::tcp::endpoint(asio::ip::tcp::v4(),3333));

return 0;
}

绑定acceptor

对于acceptor类型的socket,服务器要将其绑定到指定的断点,所有连接这个端点的连接都可以被接收到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int bind_acceptor_socket() {
// 步骤1:假设服务器应用程序已经获取了协议端口号。
unsigned short port_num = 3333;
// 步骤2:创建端点。
asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num);
// 'acceptor'类构造函数使用。
asio::io_context ios;
// 步骤3:创建并打开一个acceptor socket。
asio::ip::tcp::acceptor acceptor(ios, ep.protocol());
boost::system::error_code ec;
// 步骤4:绑定acceptor socket。
acceptor.bind(ep, ec);
// 处理任何错误。
if (ec.value() != 0) {
// 无法绑定acceptor socket。中断执行。
std::cout << "无法绑定acceptor socket。"
<< "错误代码 = " << ec.value() << "。消息:"
<< ec.message();
return ec.value();
}
return 0;
}

连接指定的端点

作为客户端可以连接服务器指定的端点进行连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int connect_to_end() {
// 步骤1:假设客户端应用程序已经获取了目标服务器的IP地址和协议端口号。
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try {
// 步骤2:创建一个指定目标服务器应用程序的端点。
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ios;
// 步骤3:创建并打开一个socket。
asio::ip::tcp::socket sock(ios, ep.protocol());
// 步骤4:连接socket。
sock.connect(ep);
// 此时socket 'sock'已经连接到服务器应用程序,可以用于发送或接收数据。
}
// 这里使用了asio::ip::address::from_string()和asio::ip::tcp::socket::connect()
// 的异常重载函数,以处理错误情况。
catch (system::system_error &e) {
std::cout << "发生错误!错误代码 = " << e.code()
<< "。消息:" << e.what();
return e.code().value();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <asio.hpp>
#include <iostream>
#include <string>
int dns_connect_to_end(){
// 定义目标主机和端口
std::string host = "llfc.club";
std::string port_num = "3333";

// 创建 ASIO 的 I/O 上下文对象,所有的异步操作都需要它来运行
asio::io_context ios;

// 创建一个 DNS 查询,参数指定了主机名、端口号,以及服务类型(这里指定为数字服务,意味着 port_num 是一个服务的数字标识)
asio::ip::tcp::resolver::query resolver_query(host, port_num, asio::ip::tcp::resolver::query::numeric_service);

// 创建解析器对象,用于 DNS 解析
asio::ip::tcp::resolver resolver(ios);

try {
// 执行 DNS 解析,返回一个迭代器,指向解析结果的开始
asio::ip::tcp::resolver::iterator it = resolver.resolve(resolver_query);

// 创建一个 TCP 套接字
asio::ip::tcp::socket sock(ios);

// 使用解析出的端点信息来连接套接字
asio::connect(sock, it);
}
catch (asio::system_error &e) { // 捕获可能发生的异常,通常是网络错误或 DNS 解析失败
std::cout << "Error occurred! Error code = " << e.code()
<< ". Message: " << e.what();
// 返回错误代码
return e.code().value();
}
// 若连接成功,返回 0 表示无错误
return 0;
}

服务器接收连接

  • 当有客户端连接时,服务器需要接收连接
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    int accept_new_connection(){
    // 用于存储挂起连接请求的队列大小。
    const int BACKLOG_SIZE = 30;
    // 步骤1:假设服务器应用程序已经获取了协议端口号。
    unsigned short port_num = 3333;
    // 步骤2:创建服务器端点。
    asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(), port_num);
    asio::io_context ios;
    try {
    // 步骤3:实例化并打开一个acceptor socket。
    asio::ip::tcp::acceptor acceptor(ios, ep.protocol());
    // 步骤4:将acceptor socket绑定到服务器端点。
    acceptor.bind(ep);
    // 步骤5:开始监听传入的连接请求。
    acceptor.listen(BACKLOG_SIZE);
    // 步骤6:创建一个活动socket。
    asio::ip::tcp::socket sock(ios);
    // 步骤7:处理下一个连接请求并将活动socket连接到客户端。
    acceptor.accept(sock);
    // 此时'sock' socket已连接到客户端应用程序,可以用于发送或接收数据。
    }
    catch (system::system_error& e) {
    std::cout << "发生错误!错误代码 = " << e.code()
    << "。消息:" << e.what();
    return e.code().value();
    }
    }

asio socket同步读写

关于buffer

  • 任何网络库都有提供buffer的数据结构,所谓buffer就是接收和发送数据时缓存数据的结构。
  • boost::asio提供了asio::mutable_bufferasio::const_buffer这两个结构,他们是一段连续的空间,首字节存储了后续数据的长度
  • asio::mutable_buffer用于写服务,asio::const_buffer用于读服务。但是这两个结构都没有被asio的api直接使用。
  • 对于api的buffer参数,asio提出了MutableBufferSequenceConstBufferSequence概念,他们是由多个asio::mutable_bufferasio::const_buffer组成的。也就是说boost::asio为了节省空间,将一部分连续的空间组合起来,作为参数交给api使用。
  • 我们可以理解为MutableBufferSequence的数据结构为std::vector<asio::mutable_buffer>
    结构如下
image.png
  • 每隔vector存储的都是mutable_buffer的地址,每个mutable_buffer的第一个字节表示数据的长度,后面跟着数据内容。
  • 这么复杂的结构交给用户使用并不合适,所以asio提出了buffer()函数,该函数接收多种形式的字节流,该函数返回asio::mutable_buffers_1 o或者asio::const_buffers_1结构的对象。
  • 如果传递给buffer()的参数是一个只读类型,则函数返回asio::const_buffers_1 类型对象。
  • 如果传递给buffer()的参数是一个可写类型,则返回asio::mutable_buffers_1 类型对象。
  • asio::const_buffers_1asio::mutable_buffers_1asio::mutable_bufferasio::const_buffer的适配器,提供了符合MutableBufferSequenceConstBufferSequence概念的接口,所以他们可以作为boost::asio的api函数的参数使用。
  • 简单概括一下,我们可以用buffer()函数生成我们要用的缓存存储数据。
  • 比如boost的发送接口send要求的参数为ConstBufferSequence类型
1
2
template<typename ConstBufferSequence>
std::size_t send(const ConstBufferSequence & buffers);

我们需要将”Hello Word转化为该类型”

1
2
3
4
5
6
void use_const_buffer() {
std::string buf = "hello world!";
asio::const_buffer asio_buf(buf.c_str(), buf.length());
std::vector<asio::const_buffer> buffers_sequence;
buffers_sequence.push_back(asio_buf);
}

最终buffers_sequence就是可以传递给发送接口send的类型。但是这太复杂了,可以直接用buffer函数转化为send需要的参数类型

1
2
3
void use_buffer_str() {
asio::const_buffers_1 output_buf = asio::buffer("hello world");
}

output_buf可以直接传递给该send接口。我们也可以将数组转化为send接受的类型

1
2
3
4
5
void use_buffer_array(){
const size_t BUF_SIZE_BYTES = 20;
std::unique_ptr<char[] > buf(new char[BUF_SIZE_BYTES]);
auto input_buf = asio::buffer(static_cast<void*>(buf.get()), BUF_SIZE_BYTES);
}

对于流式操作,我们可以用streambuf,将输入输出流和streambuf绑定,可以实现流式输入和输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
void use_stream_buffer() {
asio::streambuf buf;
std::ostream output(&buf);
// 将消息写入基于流的缓冲区。
output << "Message1\nMessage2";
// 现在我们想要从流缓冲区中读取所有数据,直到遇到'\n'分隔符。
// 实例化一个输入流,它使用我们的流缓冲区。
std::istream input(&buf);
// 我们将把数据读入这个字符串中。
std::string message1;
std::getline(input, message1);
// 现在message1字符串中包含'Message1'。
}

同步写write_some

  • boost::asio提供了几种同步写的api,write_some可以每次向指定的空间写入固定的字节数,如果写缓冲区满了,就只写一部分,返回写入的字节数。
1
2
3
4
5
6
7
8
9
10
11
void write_to_socket(asio::ip::tcp::socket& sock) {
std::string buf = "Hello World!";
std::size_t total_bytes_written = 0;
// 循环发送
// write_some返回每次写入的字节数
// total_bytes_written是已经发送的字节数。
// 每次发送buf.length()- total_bytes_written)字节数据
while (total_bytes_written != buf.length()) {
total_bytes_written += sock.write_some(asio::buffer(buf.c_str() + total_bytes_written, buf.length() - total_bytes_written));
}
}

同步写send

  • write_some使用起来比较麻烦,需要多次调用,asio提供了send函数。send函数会一次性将buffer中的内容发送给对端,如果有部分字节因为发送缓冲区满无法发送,则阻塞等待,直到发送缓冲区可用,则继续发送完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int send_data_by_send() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try {
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(raw_ip_address), port_num);
asio::io_context ios;
// 步骤1:分配并打开socket。
asio::ip::tcp::socket sock(ios, ep.protocol());
sock.connect(ep);
std::string buf = "Hello World!";
int send_length = sock.send(asio::buffer(buf.c_str(), buf.length()));
if (send_length <= 0) {
cout << "发送失败" << endl;
return 0;
}
}
catch (system::system_error& e) {
std::cout << "发生错误!错误代码 = " << e.code()
<< "。消息:" << e.what();
return e.code().value();
}
return 0;
}

同步写write

  • 类似send方法,asio还提供了一个write函数,可以一次性将所有数据发送给对端,如果发送缓冲区满了则阻塞,直到发送缓冲区可用,将数据发送完成。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    int send_data_by_wirte() {
    std::string raw_ip_address = "127.0.0.1";
    unsigned short port_num = 3333;
    try {
    asio::ip::tcp::endpoint
    ep(asio::ip::address::from_string(raw_ip_address),
    port_num);
    asio::io_service ios;
    // Step 1. Allocating and opening the socket.
    asio::ip::tcp::socket sock(ios, ep.protocol());
    sock.connect(ep);
    std::string buf = "Hello World!";
    int send_length = asio::write(sock, asio::buffer(buf.c_str(), buf.length()));
    if (send_length <= 0) {
    cout << "send failed" << endl;
    return 0;
    }
    }
    catch (system::system_error& e) {
    std::cout << "Error occured! Error code = " << e.code()
    << ". Message: " << e.what();
    return e.code().value();
    }
    return 0;
    }

同步读read_some

  • 同步读和同步写类似,提供了读取指定字节数的接口read_some
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    std::string read_from_socket(asio::ip::tcp::socket& sock) {
    const unsigned char MESSAGE_SIZE = 7;
    char buf[MESSAGE_SIZE];
    std::size_t total_bytes_read = 0;
    while (total_bytes_read != MESSAGE_SIZE) {
    total_bytes_read += sock.read_some(
    asio::buffer(buf + total_bytes_read,
    MESSAGE_SIZE - total_bytes_read));
    }
    return std::string(buf, total_bytes_read);
    }
    int read_data_by_read_some() {
    std::string raw_ip_address = "127.0.0.1";
    unsigned short port_num = 3333;
    try {
    asio::ip::tcp::endpoint
    ep(asio::ip::address::from_string(raw_ip_address),
    port_num);
    asio::io_service ios;
    asio::ip::tcp::socket sock(ios, ep.protocol());
    sock.connect(ep);
    read_from_socket(sock);
    }
    catch (system::system_error& e) {
    std::cout << "Error occured! Error code = " << e.code()
    << ". Message: " << e.what();
    return e.code().value();
    }
    return 0;
    }

同步读receive

  • 可以一次性同步接收对方发送的数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    int read_data_by_receive() {
    std::string raw_ip_address = "127.0.0.1";
    unsigned short port_num = 3333;
    try {
    asio::ip::tcp::endpoint
    ep(asio::ip::address::from_string(raw_ip_address),
    port_num);
    asio::io_service ios;
    asio::ip::tcp::socket sock(ios, ep.protocol());
    sock.connect(ep);
    const unsigned char BUFF_SIZE = 7;
    char buffer_receive[BUFF_SIZE];
    int receive_length = sock.receive(asio::buffer(buffer_receive, BUFF_SIZE));
    if (receive_length <= 0) {
    cout << "receive failed" << endl;
    }
    }
    catch (system::system_error& e) {
    std::cout << "Error occured! Error code = " << e.code()
    << ". Message: " << e.what();
    return e.code().value();
    }
    return 0;
    }

同步读read

  • 可以一次性同步读取对方发送的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int read_data_by_read() {
std::string raw_ip_address = "127.0.0.1";
unsigned short port_num = 3333;
try {
asio::ip::tcp::endpoint
ep(asio::ip::address::from_string(raw_ip_address),
port_num);
asio::io_service ios;
asio::ip::tcp::socket sock(ios, ep.protocol());
sock.connect(ep);
const unsigned char BUFF_SIZE = 7;
char buffer_receive[BUFF_SIZE];
int receive_length = asio::read(sock, asio::buffer(buffer_receive, BUFF_SIZE));
if (receive_length <= 0) {
cout << "receive failed" << endl;
}
}
catch (system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}

读取直到指定字符

  • 我们可以一直读取,直到读取指定字符结束
1
2
3
4
5
6
7
8
9
10
std::string read_data_by_until(asio::ip::tcp::socket& sock) {
asio::streambuf buf;
// 同步从套接字读取数据,直到遇到'\n'符号。
asio::read_until(sock, buf, '\n');
std::string message;
// 因为缓冲区'buf'可能包含'\n'符号后的其他数据,所以我们需要解析缓冲区,并提取分隔符前的数据。
std::istream input_stream(&buf);
std::getline(input_stream, message);
return message;
}

asio异步读写操作

异步写操作

在写操作前,我们先封装一个Node结构,用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//最大报文接收大小
const int RECVSIZE = 1024;
class MsgNode {
public :
MsgNode(const char* msg, int total_len): _total_len(total_len), _cur_len(0){
_msg = new char[total_len];
memcpy(_msg, msg, total_len);
}
MsgNode(int total_len) :_total_len(total_len), _cur_len(0) {
_msg = new char[total_len];
}
~MsgNode(){
delete[]_msg;
}
//消息首地址
char* _msg;
//总长度
int _total_len;
//当前长度
int _cur_len;
};

写了两个构造函数,两个参数的负责构造写节点,一个参数的负责构造读节点。
接下来为Session添加异步写操作和负责发送写数据的节点

  • 需要一个回调函数WriteCallBackErr,其中bytes_transferred代表我们写了多少字符
  • WriteToSocketErr是封装的异步写函数
1
2
3
4
5
6
7
class Session{
public:
void WriteCallBackErr(const boost::system::error_code & ec, std::size_t bytes_transferred,std::shared_ptr<MsgNode>); //
void WriteToSocketErr(const std::string& buf);
private:
std::shared_ptr<MsgNode> _send_node;
};

WriteToSocketErr函数为我们封装的写操作,WriteCallBackErr为异步写操作回调的函数,为什么会有三个参数呢,我们可以看一下asio源码

1
2
3
4
5
6
7
8
BOOST_ASIO_COMPLETION_TOKEN_FOR(void (boost::system::error_code,
std::size_t)) WriteToken
BOOST_ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
BOOST_ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(WriteToken,
void (boost::system::error_code, std::size_t))
async_write_some(const ConstBufferSequence& buffers,
BOOST_ASIO_MOVE_ARG(WriteToken)token
BOOST_ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))

async_write_some是异步写的函数,这个异步写函数有两个参数,第一个参数为ConstBufferSequence常引用类型的buffers,第二个参数为WriteToken类型,而WriteToken在上面定义了,是一个函数对象类型,返回值为void,参数为error_code和size_t,所以我们为了调用async_write_some函数也要传入一个符合WriteToken定义的函数,就是我们声明的WriteCallBackErr函数,前两个参数为WriteToken规定的参数,第三个参数为MsgNode的智能指针,这样通过智能指针保证我们发送的Node生命周期延长。我们看一下WriteToSocketErr函数的具体实现

1
2
3
4
5
6
7
8
void Session::WriteToSocketErr(const std::string& buf) {
_send_node = make_shared<MsgNode>(buf.c_str(), buf.length());
//异步发送数据,因为异步所以不会一下发送完
this->_socket->async_write_some(asio::buffer(_send_node->_msg,
_send_node->_total_len),
std::bind(&Session::WriteCallBackErr,
this, std::placeholders::_1, std::placeholders::_2, _send_node));
}

因为WriteCallBackErr函数为三个参数且为成员函数,而async_write_some需要的回调函数为两个参数,所以我们通过bind将三个参数转换为两个参数的普通函数。
我们看看回调函数的实现

1
2
3
4
5
6
7
8
9
10
11
void Session::WriteCallBackErr(const boost::system::error_code& ec, 
std::size_t bytes_transferred, std::shared_ptr<MsgNode> msg_node) {
if (bytes_transferred + msg_node->_cur_len
< msg_node->_total_len) {
_send_node->_cur_len += bytes_transferred;
this->_socket->async_write_some(asio::buffer(_send_node->_msg+_send_node->_cur_len,
_send_node->_total_len-_send_node->_cur_len),
std::bind(&Session::WriteCallBackErr,
this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
}

WriteCallBackErr函数里判断如果已经发送的字节数没有达到要发送的总字节数,那么久更新节点已经发送的长度,然后计算剩余要发送的长度,如果有数据未发送完,再次调用async_write_some函数异步发送。
但是这个函数并不能投入实际应用,因为async_write_some回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节

image.png 此时我们调用`async_write_some`发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。 而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用`WriteToSocketErr`,因为boost::asio封装的时epoll和iocp等多路复用模型,当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。 比如我们如下代码
1
2
3
4
//用户发送数据
WriteToSocketErr("Hello World!");
//用户无感知下层调用情况又一次发送了数据
WriteToSocketErr("Hello World!");
那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World! 所以对端收到的数据很可能是”HelloHello World! World!” 那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理
1
2
3
4
5
6
7
8
9
class Session{
public:
void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
void WriteToSocket(const std::string &buf);
private:
std::queue<std::shared_ptr<MsgNode>> _send_queue;
std::shared_ptr<asio::ip::tcp::socket> _socket;
bool _send_pending;
};

定义了bool变量_send_pending,该变量为true表示一个节点还未发送完。
_send_queue用来缓存要发送的消息节点,是一个队列。
我们实现异步发送功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void Session::WriteToSocket(const std::string& buf){
//插入发送队列
_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
//pending状态说明上一次有未发送完的数据
if (_send_pending) {
return;
}
//异步发送数据,因为异步所以不会一下发送完
this->_socket->async_write_some(asio::buffer(buf), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
_send_pending = true;
}
void Session::WriteCallBack(const boost::system::error_code & ec, std::size_t bytes_transferred){
if (ec.value() != 0) {
std::cout << "Error , code is " << ec.value() << " . Message is " << ec.message();
return;
}
//取出队首元素即当前未发送完数据
auto & send_data = _send_queue.front();
send_data->_cur_len += bytes_transferred;
//数据未发送完, 则继续发送
if (send_data->_cur_len < send_data->_total_len) {
this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len-send_data->_cur_len),
std::bind(&Session::WriteCallBack,
this, std::placeholders::_1, std::placeholders::_2));
return;
}
//如果发送完,则pop出队首元素
_send_queue.pop();
//如果队列为空,则说明所有数据都发送完,将pending设置为false
if (_send_queue.empty()) {
_send_pending = false;
}
//如果队列不是空,则继续将队首元素发送
if (!_send_queue.empty()) {
auto& send_data = _send_queue.front();
this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
std::bind(&Session::WriteCallBack,
this, std::placeholders::_1, std::placeholders::_2));
}
}

async_write_some函数不能保证每次回调函数触发时发送的长度为要总长度,这样我们每次都要在回调函数判断发送数据是否完成,asio提供了一个更简单的发送函数async_send,这个函数在发送的长度未达到我们要求的长度时就不会触发回调,所以触发回调函数时要么时发送出错了要么是发送完成了,其内部的实现原理就是帮我们不断的调用async_write_some直到完成发送,所以async_send不能和async_write_some混合使用,我们基于async_send封装另外一个发送函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//不能与async_write_some混合使用
void Session::WriteAllToSocket(const std::string& buf) {
//插入发送队列
_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
//pending状态说明上一次有未发送完的数据
if (_send_pending) {
return;
}
//异步发送数据,因为异步所以不会一下发送完
this->_socket->async_send(asio::buffer(buf),
std::bind(&Session::WriteAllCallBack, this,
std::placeholders::_1, std::placeholders::_2));
_send_pending = true;
}
void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
if (ec.value() != 0) {
std::cout << "Error occured! Error code = "
<< ec.value()
<< ". Message: " << ec.message();
return;
}
//如果发送完,则pop出队首元素
_send_queue.pop();
//如果队列为空,则说明所有数据都发送完,将pending设置为false
if (_send_queue.empty()) {
_send_pending = false;
}
//如果队列不是空,则继续将队首元素发送
if (!_send_queue.empty()) {
auto& send_data = _send_queue.front();
this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
std::bind(&Session::WriteAllCallBack,
this, std::placeholders::_1, std::placeholders::_2));
}
}

异步读操作

接下来介绍异步读操作,异步读操作和异步的写操作类似同样又async_read_some和async_receive函数,前者触发的回调函数获取的读数据的长度可能会小于要求读取的总长度,后者触发的回调函数读取的数据长度等于读取的总长度。
先基于async_read_some封装一个读取的函数ReadFromSocket,同样在Session类的声明中添加一些变量

1
2
3
4
5
6
7
8
9
class Session {
public:
void ReadFromSocket();
void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
private:
std::shared_ptr<asio::ip::tcp::socket> _socket;
std::shared_ptr<MsgNode> _recv_node;
bool _recv_pending;
};

_recv_node用来缓存接收的数据,_recv_pending为true表示节点正在接收数据,还未接受完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//不考虑粘包情况, 先用固定的字节接收
void Session::ReadFromSocket() {
if (_recv_pending) {
return;
}
//可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
/*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
_recv_node = _recv_nodez;*/
_recv_node = std::make_shared<MsgNode>(RECVSIZE);
_socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this,
std::placeholders::_1, std::placeholders::_2));
_recv_pending = true;
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
_recv_node->_cur_len += bytes_transferred;
//没读完继续读
if (_recv_node->_cur_len < _recv_node->_total_len) {
_socket->async_read_some(asio::buffer(_recv_node->_msg+_recv_node->_cur_len,
_recv_node->_total_len - _recv_node->_cur_len), std::bind(&Session::ReadCallBack, this,
std::placeholders::_1, std::placeholders::_2));
return;
}
//将数据投递到队列里交给逻辑线程处理,此处略去
//如果读完了则将标记置为false
_recv_pending = false;
//指针置空
_recv_node = nullptr;
}

我们基于async_receive再封装一个接收数据的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void Session::ReadAllFromSocket(const std::string& buf) {
if (_recv_pending) {
return;
}
//可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
/*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
_recv_node = _recv_nodez;*/
_recv_node = std::make_shared<MsgNode>(RECVSIZE);
_socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this,
std::placeholders::_1, std::placeholders::_2));
_recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
_recv_node->_cur_len += bytes_transferred;
//将数据投递到队列里交给逻辑线程处理,此处略去
//如果读完了则将标记置为false
_recv_pending = false;
//指针置空
_recv_node = nullptr;
}

同样async_read_someasync_receive不能混合使用,否则会出现逻辑问题。

异步读写的服务器示例

Session类

Session类主要是处理客户端消息收发的会话类,为了简单起见,我们不考虑粘包问题,也不考虑支持手动调用发送的接口,只以应答的方式发送和接收固定长度(1024字节长度)的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Session
{
public:
Session(boost::asio::io_context& ioc):_socket(ioc){
}
tcp::socket& Socket() {
return _socket;
}
void Start();
private:
void handle_read(const boost::system::error_code & error, size_t bytes_transfered);
void handle_write(const boost::system::error_code& error);
tcp::socket _socket;
enum {max_length = 1024};
char _data[max_length];
};
  1. _data用来接收客户端传递的数据
  2. _socket为单独处理客户端读写的socket。
  3. handle_readhandle_write分别为读回调函数和写回调函数。
    接下来我们实现Session类
    1
    2
    3
    4
    5
    6
    7
    void Session::Start(){
    memset(_data, 0, max_length);
    _socket.async_read_some(boost::asio::buffer(_data, max_length),
    std::bind(&Session::handle_read, this, placeholders::_1,
    placeholders::_2)
    );
    }

在Start方法中我们调用异步读操作,监听对端发送的消息。当对端发送数据后,触发handle_read函数

1
2
3
4
5
6
7
8
9
10
void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) {
if (!error) {
cout << "server receive data is " << _data << endl;
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered),
std::bind(&Session::handle_write, this, placeholders::_1));
}
else {
delete this;
}
}

handle_read函数内将收到的数据发送给对端,当发送完成后触发handle_write回调函数。

1
2
3
4
5
6
7
8
9
10
void Session::handle_write(const boost::system::error_code& error) {
if (!error) {
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read,
this, placeholders::_1, placeholders::_2));
}
else {
delete this;
}
}

handle_write函数内又一次监听了读事件,如果对端有数据发送过来则触发handle_read,我们再将收到的数据发回去。从而达到应答式服务的效果。

Server类

Server类为服务器接收连接的管理类

1
2
3
4
5
6
7
8
9
class Server {
public:
Server(boost::asio::io_context& ioc, short port);
private:
void start_accept();
void handle_accept(Session* new_session, const boost::system::error_code& error);
boost::asio::io_context& _ioc;
tcp::acceptor _acceptor;
};

start_accept将要接收连接的acceptor绑定到服务上,其内部就是将accpeptor对应的socket描述符绑定到epolliocp模型上,实现事件驱动。
handle_accept为新连接到来后触发的回调函数。
下面是具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Server::Server(boost::asio::io_context& ioc, short port) :_ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {
start_accept();
}
void Server::start_accept() {
Session* new_session = new Session(_ioc);
_acceptor.async_accept(new_session->Socket(),
std::bind(&Server::handle_accept, this, new_session, placeholders::_1));
}
void Server::handle_accept(Session* new_session, const boost::system::error_code& error) {
if (!error) {
new_session->Start();
}
else {
delete new_session;
}
start_accept();
}