pub sub inproc进程内通信
简介
进程内传输方式意味着在共享ZMQ context的线程间通过内存方式传输数据。
通讯地址必须保证已经被相同context上的一个socket创建了。
文件名必须是在与这个socket关联的ZMQ context上是唯一的,而且长度不能大于256字节。
示例说明
启动三个线程,一个推送线程,两个订阅线程。
推送线程1:推送A,B,C三种消息;
订阅线程2:订阅A,B消息;
订阅线程3:订阅所有消息;
所以,线程2只接受A,B消息,线程3接收A,B,C消息。
代码
// reqclient.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <string>
#include <iostream>
#include <algorithm>
#include <thread>
#include <mutex>
#include "zmq.hpp"
#include "zmq_addon.hpp"
static std::mutex s_mutex;
void PublisherThread(zmq::context_t *ctx) {
// Prepare publisher
zmq::socket_t publisher(*ctx, zmq::socket_type::pub);
publisher.bind("inproc://#1");
// Give the subscribers a chance to connect, so they don't lose any messages
std::this_thread::sleep_for(std::chrono::milliseconds(20));
while (true) {
// Write three messages, each with an envelope and content
publisher.send(zmq::str_buffer("A"), zmq::send_flags::sndmore);
publisher.send(zmq::str_buffer("Message in A envelope"));
publisher.send(zmq::str_buffer("B"), zmq::send_flags::sndmore);
publisher.send(zmq::str_buffer("Message in B envelope"));
publisher.send(zmq::str_buffer("C"), zmq::send_flags::sndmore);
publisher.send(zmq::str_buffer("Message in C envelope"));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
void SubscriberThread1(zmq::context_t *ctx) {
// Prepare subscriber
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
subscriber.connect("inproc://#1");
// Thread2 opens "A" and "B" envelopes
subscriber.set(zmq::sockopt::subscribe, "A");
subscriber.set(zmq::sockopt::subscribe, "B");
while (1) {
// Receive all parts of the message
std::vector<zmq::message_t> recv_msgs;
zmq::recv_result_t result =
zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
assert(result && "recv failed");
assert(*result == 2);
std::lock_guard<std::mutex> lock(s_mutex);
std::cout << "Thread2: [" << recv_msgs[0].to_string() << "] "
<< recv_msgs[1].to_string() << std::endl;
}
}
void SubscriberThread2(zmq::context_t *ctx) {
// Prepare our context and subscriber
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
subscriber.connect("inproc://#1");
// Thread3 opens ALL envelopes
subscriber.set(zmq::sockopt::subscribe, "");
while (1) {
// Receive all parts of the message
std::vector<zmq::message_t> recv_msgs;
zmq::recv_result_t result =
zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
assert(result && "recv failed");
assert(*result == 2);
std::lock_guard<std::mutex> lock(s_mutex);
std::cout << "Thread3: [" << recv_msgs[0].to_string() << "] "
<< recv_msgs[1].to_string() << std::endl;
}
}
int main() {
zmq::context_t ctx(0);
auto thread1 = std::thread(PublisherThread, &ctx);
// Give the publisher a chance to bind, since inproc requires it
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto thread2 = std::thread(SubscriberThread1, &ctx);
auto thread3 = std::thread(SubscriberThread2, &ctx);
thread1.join();
thread2.join();
thread3.join();
/*
* Output:
* An infinite loop of a mix of:
* Thread2: [A] Message in A envelope
* Thread2: [B] Message in B envelope
* Thread3: [A] Message in A envelope
* Thread3: [B] Message in B envelope
* Thread3: [C] Message in C envelope
*/
}