文章标题 原创 翻译 转载 文章内容 # 多个socket recv 接收多个socket的数据时如果在同一个线程中,其中一个socket阻塞了另外的socket也会受到影响,所以我们需要使用dontwait标志,异步接收消息,如果消息不存在时不阻塞直接返回,如下代码: ``` int main() { try { zmq::context_t context(1); zmq::socket_t pullSocket(context, zmq::socket_type::pull); pullSocket.connect("tcp://127.0.0.1:5557"); zmq::socket_t subSocket(context, zmq::socket_type::sub); subSocket.connect("tcp://127.0.0.1:5558"); subSocket.set(zmq::sockopt::subscribe, ""); while (1) { bool wait = false; zmq::message_t buffer1, buffer2; if (pullSocket.recv(buffer1, zmq::recv_flags::dontwait)) { std::cout << "pull buffer:" << buffer1.to_string() << std::endl; } else { wait = true; } if (subSocket.recv(buffer2, zmq::recv_flags::dontwait)) { std::cout << "sub buffer:" << buffer2.to_string() << std::endl; } else { wait = true; } if (wait) { zmq_sleep(1); std::cout << "continue\n"; } } } catch (zmq::error_t &e) { std::cout << e.what() << std::endl; } system("pause"); return 0; } ``` 上面代码建立了两个socket,一个pull 5557端口的消息,另一个subscribe 5558端口的消息,我们先收pull消息,再收sub消息,如果都没有接收到就等待1秒钟。 问题就出在这里,如果等待的时间长了会降低服务处理效率,如果等待时间短了(或者不等待)会造成while循环频繁执行给CPU造成很大的压力。 # 介绍poll 解决上面的问题,可以使用zmq::poll函数,第一个参数传入多个poll item也就是socket,第二个参数是个数,第三个参数是超时时间。 ``` typedef struct zmq_pollitem_t { void *socket; zmq_fd_t fd; short events; short revents; } zmq_pollitem_t; inline int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_ = -1) ``` 当有事件发生时(有数据)poll函数会立即返回,没有时会阻塞(timeout等于-1会一直阻塞)直到超时,继续往下执行。 这里的好处是,在阻塞状态时如果有消息过来会立即解决阻塞状态,不会像上面那样直到sleep完才能继续执行。 # poll代码演示 server端代码,绑定两个端口,一个push一个pub。 ``` #include "zmq.hpp" #include <string> #include <iostream> #include <algorithm> #include <random> #include <thread> int main() { std::default_random_engine engine; std::uniform_int_distribution<int> u1(0, 100); std::uniform_int_distribution<int> u2(10000, 99999); try { zmq::context_t context(1); zmq::socket_t pushSocket(context, zmq::socket_type::push); pushSocket.bind("tcp://127.0.0.1:5557"); zmq::socket_t pubSocket(context, zmq::socket_type::pub); pubSocket.bind("tcp://127.0.0.1:5558"); while (1) { int i1 = u1(engine); int i2 = u2(engine); std::string str1 = std::to_string(i1); std::string str2 = std::to_string(i2); std::cout << "i1:" << i1 << ", i2:" << i2 << std::endl; pushSocket.send(zmq::const_buffer(str1.c_str(), str1.size())); pubSocket.send(zmq::const_buffer(str2.c_str(), str2.size())); zmq_sleep(1); } } catch (zmq::error_t &e) { std::cerr << "exception:" << e.what() << std::endl; } return 0; } ``` client代码,一个pull一个sub。 ``` #include <iostream> #include <zmq.hpp> #include <thread> #include <vector> int main() { try { zmq::context_t context(1); zmq::socket_t pullSocket(context, zmq::socket_type::pull); pullSocket.connect("tcp://127.0.0.1:5557"); zmq::socket_t subSocket(context, zmq::socket_type::sub); subSocket.connect("tcp://127.0.0.1:5558"); subSocket.set(zmq::sockopt::subscribe, ""); zmq_pollitem_t items[] = { { pullSocket, 0, ZMQ_POLLIN, 0 }, { subSocket, 0, ZMQ_POLLIN, 0 } }; while (1) { zmq::poll(items, sizeof items / sizeof items[0], 1000); if (items[0].revents & ZMQ_POLLIN) { zmq::message_t msg; pullSocket.recv(msg); std::cout << "pull msg:" << msg.to_string() << std::endl; } if (items[1].revents & ZMQ_POLLIN) { zmq::message_t msg; subSocket.recv(msg); std::cout << "sub msg:" << msg.to_string() << std::endl; } } } catch (zmq::error_t &e) { std::cout << e.what() << std::endl; } system("pause"); return 0; } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交