多个socket recv
接收多个socket的数据时如果在同一个线程中,其中一个socket阻塞了另外的socket也会受到影响,所以我们需要使用dontwait标志,异步接收消息,如果消息不存在时不阻塞直接返回,如下代码:
int main()
{
try {
zmq::context_t context(1);
zmq::socket_t pullSocket(context, zmq::socket_type::pull);
pullSocket.connect("tcp://127.0.0.1:5557");
zmq::socket_t subSocket(context, zmq::socket_type::sub);
subSocket.connect("tcp://127.0.0.1:5558");
subSocket.set(zmq::sockopt::subscribe, "");
while (1) {
bool wait = false;
zmq::message_t buffer1, buffer2;
if (pullSocket.recv(buffer1, zmq::recv_flags::dontwait)) {
std::cout << "pull buffer:" << buffer1.to_string() << std::endl;
} else {
wait = true;
}
if (subSocket.recv(buffer2, zmq::recv_flags::dontwait)) {
std::cout << "sub buffer:" << buffer2.to_string() << std::endl;
} else {
wait = true;
}
if (wait) {
zmq_sleep(1);
std::cout << "continue\n";
}
}
} catch (zmq::error_t &e) {
std::cout << e.what() << std::endl;
}
system("pause");
return 0;
}
上面代码建立了两个socket,一个pull 5557端口的消息,另一个subscribe 5558端口的消息,我们先收pull消息,再收sub消息,如果都没有接收到就等待1秒钟。 问题就出在这里,如果等待的时间长了会降低服务处理效率,如果等待时间短了(或者不等待)会造成while循环频繁执行给CPU造成很大的压力。
介绍poll
解决上面的问题,可以使用zmq::poll函数,第一个参数传入多个poll item也就是socket,第二个参数是个数,第三个参数是超时时间。
typedef struct zmq_pollitem_t
{
void *socket;
zmq_fd_t fd;
short events;
short revents;
} zmq_pollitem_t;
inline int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_ = -1)
当有事件发生时(有数据)poll函数会立即返回,没有时会阻塞(timeout等于-1会一直阻塞)直到超时,继续往下执行。
这里的好处是,在阻塞状态时如果有消息过来会立即解决阻塞状态,不会像上面那样直到sleep完才能继续执行。
poll代码演示
server端代码,绑定两个端口,一个push一个pub。
#include "zmq.hpp"
#include <string>
#include <iostream>
#include <algorithm>
#include <random>
#include <thread>
int main()
{
std::default_random_engine engine;
std::uniform_int_distribution<int> u1(0, 100);
std::uniform_int_distribution<int> u2(10000, 99999);
try {
zmq::context_t context(1);
zmq::socket_t pushSocket(context, zmq::socket_type::push);
pushSocket.bind("tcp://127.0.0.1:5557");
zmq::socket_t pubSocket(context, zmq::socket_type::pub);
pubSocket.bind("tcp://127.0.0.1:5558");
while (1) {
int i1 = u1(engine);
int i2 = u2(engine);
std::string str1 = std::to_string(i1);
std::string str2 = std::to_string(i2);
std::cout << "i1:" << i1 << ", i2:" << i2 << std::endl;
pushSocket.send(zmq::const_buffer(str1.c_str(), str1.size()));
pubSocket.send(zmq::const_buffer(str2.c_str(), str2.size()));
zmq_sleep(1);
}
} catch (zmq::error_t &e) {
std::cerr << "exception:" << e.what() << std::endl;
}
return 0;
}
client代码,一个pull一个sub。
#include <iostream>
#include <zmq.hpp>
#include <thread>
#include <vector>
int main()
{
try {
zmq::context_t context(1);
zmq::socket_t pullSocket(context, zmq::socket_type::pull);
pullSocket.connect("tcp://127.0.0.1:5557");
zmq::socket_t subSocket(context, zmq::socket_type::sub);
subSocket.connect("tcp://127.0.0.1:5558");
subSocket.set(zmq::sockopt::subscribe, "");
zmq_pollitem_t items[] = {
{ pullSocket, 0, ZMQ_POLLIN, 0 },
{ subSocket, 0, ZMQ_POLLIN, 0 }
};
while (1) {
zmq::poll(items, sizeof items / sizeof items[0], 1000);
if (items[0].revents & ZMQ_POLLIN) {
zmq::message_t msg;
pullSocket.recv(msg);
std::cout << "pull msg:" << msg.to_string() << std::endl;
}
if (items[1].revents & ZMQ_POLLIN) {
zmq::message_t msg;
subSocket.recv(msg);
std::cout << "sub msg:" << msg.to_string() << std::endl;
}
}
} catch (zmq::error_t &e) {
std::cout << e.what() << std::endl;
}
system("pause");
return 0;
}