Skip to content

ZMQ DEMO 分布式处理

Published: at 04:32 PM | 3 min read

如图所示:一个工作派发器(dispatcher),多个工作者(worker),一个结果收集器(collector)。 dispatcher不断的派发工作给多个worker处理,worker将处理完成的结果发送给collector,collector来统计结果。

注意点:

代码: dispatcher

#include "zmq.hpp"
#include <string>
#include <iostream>
#include <algorithm>
#include <random>
#include <thread>

int main()
{
    std::default_random_engine engine;
    std::uniform_int_distribution<int> u(0, 100);
    try {
        zmq::context_t context(1);
        zmq::socket_t sender(context, zmq::socket_type::push);
        sender.bind("tcp://127.0.0.1:5557");

        zmq::socket_t signaler(context, zmq::socket_type::push);
        signaler.connect("tcp://127.0.0.1:5558");

        const int TASK_COUNT = 1000;
        std::string strCount = std::to_string(TASK_COUNT);
        signaler.send(zmq::const_buffer(strCount.c_str(), strCount.size()));
        std::cout << "event count:" << strCount << std::endl;
        for (int i = 0; i < TASK_COUNT; i++) {
            int n = u(engine);
            std::cout << "event i:" << i << ", data:" << n << std::endl;
            std::string str = std::to_string(n);
            zmq::const_buffer msg(str.c_str(), str.size());
            sender.send(msg);
        }
    } catch (zmq::error_t &e) {
        std::cerr << "exception:" << e.what() << std::endl;
    }
    zmq_sleep(1);
    system("pause");
    return 0;
}

worker

#include <iostream>
#include <zmq.hpp>
#include <thread>

int main()
{
    try {

        zmq::context_t context(1);
        zmq::socket_t receiver(context, zmq::socket_type::pull);
        receiver.connect("tcp://127.0.0.1:5557");

        zmq::socket_t sender(context, zmq::socket_type::push);
        sender.connect("tcp://127.0.0.1:5558");

        int handleCount = 0;
        while (1) {
            zmq::message_t msg;
            receiver.recv(msg);
            std::string str = msg.to_string();
            std::cout << "sleep millisecond:" << str << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(atoi(str.c_str())));
            sender.send(zmq::const_buffer(str.c_str(), str.size()));
            ++handleCount;
            std::cout << "handle count:" << handleCount << std::endl;
        }
    } catch (zmq::error_t &e) {
        std::cout << e.what() << std::endl;
    }

    system("pause");
    return 0;
}

collector

int main()
{
    try {
        zmq::context_t context(1);
        zmq::socket_t receiver(context, zmq::socket_type::pull);
        receiver.bind("tcp://127.0.0.1:5558");

        int total = 0;
        int i = 0;
        while (1) {
            zmq::message_t buffer;
            receiver.recv(buffer);
            std::string str = buffer.to_string();
            total += atoi(str.c_str());
            std::cout << "i:" << i++ << ", msg:" << buffer.to_string() << ", total:" << total << std::endl;
        }
    } catch (zmq::error_t &e) {
        std::cout << e.what() << std::endl;
    }

    system("pause");
    return 0;
}