ZMQ DEMO 分布式处理

Table of Contents

    如图所示:一个工作派发器(dispatcher),多个工作者(worker),一个结果收集器(collector)。 dispatcher不断的派发工作给多个worker处理,worker将处理完成的结果发送给collector,collector来统计结果。

    注意点:

    • bind和connect不会阻塞;
    • send和recv会阻塞;
    • 只有send或者只有recv一方时会阻塞;
    • 如果send和recv是配对的,暂时不会阻塞,发送和接收都有个缓冲队列,如果缓冲队列满了还是会阻塞;
    • 缓冲队列设置,setsockopt(zmq.SNDHWM, 2000),默认是1000个消息;
    • 当send和recv速度不匹配时,速度快的一方的缓冲队列满了就会阻塞,缓冲队列被另一方消费后就又恢复畅通了;

    代码: dispatcher

    #include "zmq.hpp"
    #include <string>
    #include <iostream>
    #include <algorithm>
    #include <random>
    #include <thread>
    
    int main()
    {
        std::default_random_engine engine;
        std::uniform_int_distribution<int> u(0, 100);
        try {
            zmq::context_t context(1);
            zmq::socket_t sender(context, zmq::socket_type::push);
            sender.bind("tcp://127.0.0.1:5557");
    
            zmq::socket_t signaler(context, zmq::socket_type::push);
            signaler.connect("tcp://127.0.0.1:5558");
    
            const int TASK_COUNT = 1000;
            std::string strCount = std::to_string(TASK_COUNT);
            signaler.send(zmq::const_buffer(strCount.c_str(), strCount.size()));
            std::cout << "event count:" << strCount << std::endl;
            for (int i = 0; i < TASK_COUNT; i++) {
                int n = u(engine);
                std::cout << "event i:" << i << ", data:" << n << std::endl;
                std::string str = std::to_string(n);
                zmq::const_buffer msg(str.c_str(), str.size());
                sender.send(msg);
            }
        } catch (zmq::error_t &e) {
            std::cerr << "exception:" << e.what() << std::endl;
        }
        zmq_sleep(1);
        system("pause");
        return 0;
    }
    

    worker

    #include <iostream>
    #include <zmq.hpp>
    #include <thread>
    
    int main()
    {
        try {
    
            zmq::context_t context(1);
            zmq::socket_t receiver(context, zmq::socket_type::pull);
            receiver.connect("tcp://127.0.0.1:5557");
    
            zmq::socket_t sender(context, zmq::socket_type::push);
            sender.connect("tcp://127.0.0.1:5558");
    
            int handleCount = 0;
            while (1) {
                zmq::message_t msg;
                receiver.recv(msg);
                std::string str = msg.to_string();
                std::cout << "sleep millisecond:" << str << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(atoi(str.c_str())));
                sender.send(zmq::const_buffer(str.c_str(), str.size()));
                ++handleCount;
                std::cout << "handle count:" << handleCount << std::endl;
            }
        } catch (zmq::error_t &e) {
            std::cout << e.what() << std::endl;
        }
    
        system("pause");
        return 0;
    }
    

    collector

    int main()
    {
        try {
            zmq::context_t context(1);
            zmq::socket_t receiver(context, zmq::socket_type::pull);
            receiver.bind("tcp://127.0.0.1:5558");
    
            int total = 0;
            int i = 0;
            while (1) {
                zmq::message_t buffer;
                receiver.recv(buffer);
                std::string str = buffer.to_string();
                total += atoi(str.c_str());
                std::cout << "i:" << i++ << ", msg:" << buffer.to_string() << ", total:" << total << std::endl;
            }
        } catch (zmq::error_t &e) {
            std::cout << e.what() << std::endl;
        }
    
        system("pause");
        return 0;
    }