Skip to content

ZMQ DEMO pull方法的使用

Published: at 04:30 PM | 4 min read

多个socket recv

接收多个socket的数据时如果在同一个线程中,其中一个socket阻塞了另外的socket也会受到影响,所以我们需要使用dontwait标志,异步接收消息,如果消息不存在时不阻塞直接返回,如下代码:

int main()
{
    try {
        zmq::context_t context(1);
        zmq::socket_t pullSocket(context, zmq::socket_type::pull);
        pullSocket.connect("tcp://127.0.0.1:5557");

        zmq::socket_t subSocket(context, zmq::socket_type::sub);
        subSocket.connect("tcp://127.0.0.1:5558");
        subSocket.set(zmq::sockopt::subscribe, "");

        while (1) {
            bool wait = false;
            zmq::message_t buffer1, buffer2;
            if (pullSocket.recv(buffer1, zmq::recv_flags::dontwait)) {
                std::cout << "pull buffer:" << buffer1.to_string() << std::endl;
            } else {
                wait = true;
            }
            
            if (subSocket.recv(buffer2, zmq::recv_flags::dontwait)) {
                std::cout << "sub buffer:" << buffer2.to_string() << std::endl;
            } else {
                wait = true;
            }

            if (wait) {
                zmq_sleep(1);
                std::cout << "continue\n";
            }
        }
    } catch (zmq::error_t &e) {
        std::cout << e.what() << std::endl;
    }
    
    system("pause");
    return 0;
}

上面代码建立了两个socket,一个pull 5557端口的消息,另一个subscribe 5558端口的消息,我们先收pull消息,再收sub消息,如果都没有接收到就等待1秒钟。 问题就出在这里,如果等待的时间长了会降低服务处理效率,如果等待时间短了(或者不等待)会造成while循环频繁执行给CPU造成很大的压力。

介绍poll

解决上面的问题,可以使用zmq::poll函数,第一个参数传入多个poll item也就是socket,第二个参数是个数,第三个参数是超时时间。

typedef struct zmq_pollitem_t
{
    void *socket;
    zmq_fd_t fd;
    short events;
    short revents;
} zmq_pollitem_t;

inline int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_ = -1)

当有事件发生时(有数据)poll函数会立即返回,没有时会阻塞(timeout等于-1会一直阻塞)直到超时,继续往下执行。

这里的好处是,在阻塞状态时如果有消息过来会立即解决阻塞状态,不会像上面那样直到sleep完才能继续执行。

poll代码演示

server端代码,绑定两个端口,一个push一个pub。

#include "zmq.hpp"
#include <string>
#include <iostream>
#include <algorithm>
#include <random>
#include <thread>

int main()
{
    std::default_random_engine engine;
    std::uniform_int_distribution<int> u1(0, 100);
    std::uniform_int_distribution<int> u2(10000, 99999);
    try {
        zmq::context_t context(1);
        zmq::socket_t pushSocket(context, zmq::socket_type::push);
        pushSocket.bind("tcp://127.0.0.1:5557");

        zmq::socket_t pubSocket(context, zmq::socket_type::pub);
        pubSocket.bind("tcp://127.0.0.1:5558");

        while (1) {
            int i1 = u1(engine);
            int i2 = u2(engine);
            std::string str1 = std::to_string(i1);
            std::string str2 = std::to_string(i2);
            std::cout << "i1:" << i1 << ", i2:" << i2 << std::endl;
            pushSocket.send(zmq::const_buffer(str1.c_str(), str1.size()));
            pubSocket.send(zmq::const_buffer(str2.c_str(), str2.size()));
            zmq_sleep(1);
        }
    } catch (zmq::error_t &e) {
        std::cerr << "exception:" << e.what() << std::endl;
    }
    return 0;
}

client代码,一个pull一个sub。

#include <iostream>
#include <zmq.hpp>
#include <thread>
#include <vector>

int main()
{
    try {
        zmq::context_t context(1);
        zmq::socket_t pullSocket(context, zmq::socket_type::pull);
        pullSocket.connect("tcp://127.0.0.1:5557");

        zmq::socket_t subSocket(context, zmq::socket_type::sub);
        subSocket.connect("tcp://127.0.0.1:5558");
        subSocket.set(zmq::sockopt::subscribe, "");

        zmq_pollitem_t items[] = {
            { pullSocket, 0, ZMQ_POLLIN, 0 },
            { subSocket, 0, ZMQ_POLLIN, 0 }
        };
        
        while (1) {
            zmq::poll(items, sizeof items / sizeof items[0], 1000);
            if (items[0].revents & ZMQ_POLLIN) {
                zmq::message_t msg;
                pullSocket.recv(msg);
                std::cout << "pull msg:" << msg.to_string() << std::endl;
            }
            if (items[1].revents & ZMQ_POLLIN) {
                zmq::message_t msg;
                subSocket.recv(msg);
                std::cout << "sub msg:" << msg.to_string() << std::endl;
            }
        }
    } catch (zmq::error_t &e) {
        std::cout << e.what() << std::endl;
    }
    
    system("pause");
    return 0;
}