如图所示:一个工作派发器(dispatcher),多个工作者(worker),一个结果收集器(collector)。 dispatcher不断的派发工作给多个worker处理,worker将处理完成的结果发送给collector,collector来统计结果。
注意点:
- bind和connect不会阻塞;
- send和recv会阻塞;
- 只有send或者只有recv一方时会阻塞;
- 如果send和recv是配对的,暂时不会阻塞,发送和接收都有个缓冲队列,如果缓冲队列满了还是会阻塞;
- 缓冲队列设置,setsockopt(zmq.SNDHWM, 2000),默认是1000个消息;
- 当send和recv速度不匹配时,速度快的一方的缓冲队列满了就会阻塞,缓冲队列被另一方消费后就又恢复畅通了;
代码: dispatcher
#include "zmq.hpp"
#include <string>
#include <iostream>
#include <algorithm>
#include <random>
#include <thread>
int main()
{
std::default_random_engine engine;
std::uniform_int_distribution<int> u(0, 100);
try {
zmq::context_t context(1);
zmq::socket_t sender(context, zmq::socket_type::push);
sender.bind("tcp://127.0.0.1:5557");
zmq::socket_t signaler(context, zmq::socket_type::push);
signaler.connect("tcp://127.0.0.1:5558");
const int TASK_COUNT = 1000;
std::string strCount = std::to_string(TASK_COUNT);
signaler.send(zmq::const_buffer(strCount.c_str(), strCount.size()));
std::cout << "event count:" << strCount << std::endl;
for (int i = 0; i < TASK_COUNT; i++) {
int n = u(engine);
std::cout << "event i:" << i << ", data:" << n << std::endl;
std::string str = std::to_string(n);
zmq::const_buffer msg(str.c_str(), str.size());
sender.send(msg);
}
} catch (zmq::error_t &e) {
std::cerr << "exception:" << e.what() << std::endl;
}
zmq_sleep(1);
system("pause");
return 0;
}
worker
#include <iostream>
#include <zmq.hpp>
#include <thread>
int main()
{
try {
zmq::context_t context(1);
zmq::socket_t receiver(context, zmq::socket_type::pull);
receiver.connect("tcp://127.0.0.1:5557");
zmq::socket_t sender(context, zmq::socket_type::push);
sender.connect("tcp://127.0.0.1:5558");
int handleCount = 0;
while (1) {
zmq::message_t msg;
receiver.recv(msg);
std::string str = msg.to_string();
std::cout << "sleep millisecond:" << str << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(atoi(str.c_str())));
sender.send(zmq::const_buffer(str.c_str(), str.size()));
++handleCount;
std::cout << "handle count:" << handleCount << std::endl;
}
} catch (zmq::error_t &e) {
std::cout << e.what() << std::endl;
}
system("pause");
return 0;
}
collector
int main()
{
try {
zmq::context_t context(1);
zmq::socket_t receiver(context, zmq::socket_type::pull);
receiver.bind("tcp://127.0.0.1:5558");
int total = 0;
int i = 0;
while (1) {
zmq::message_t buffer;
receiver.recv(buffer);
std::string str = buffer.to_string();
total += atoi(str.c_str());
std::cout << "i:" << i++ << ", msg:" << buffer.to_string() << ", total:" << total << std::endl;
}
} catch (zmq::error_t &e) {
std::cout << e.what() << std::endl;
}
system("pause");
return 0;
}