Skip to content

ZMQ DEMO 发布订阅模式-进程内

Published: at 04:24 PM | 3 min read

pub sub inproc进程内通信

简介

进程内传输方式意味着在共享ZMQ context的线程间通过内存方式传输数据。

通讯地址必须保证已经被相同context上的一个socket创建了。

文件名必须是在与这个socket关联的ZMQ context上是唯一的,而且长度不能大于256字节。

示例说明

启动三个线程,一个推送线程,两个订阅线程。
推送线程1:推送A,B,C三种消息;
订阅线程2:订阅A,B消息;
订阅线程3:订阅所有消息;
所以,线程2只接受A,B消息,线程3接收A,B,C消息。

代码

// reqclient.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//
#include <string>
#include <iostream>
#include <algorithm>
#include <thread>
#include <mutex>

#include "zmq.hpp"
#include "zmq_addon.hpp"

static std::mutex s_mutex;
    
void PublisherThread(zmq::context_t *ctx) {
    //  Prepare publisher
    zmq::socket_t publisher(*ctx, zmq::socket_type::pub);
    publisher.bind("inproc://#1");

    // Give the subscribers a chance to connect, so they don't lose any messages
    std::this_thread::sleep_for(std::chrono::milliseconds(20));

    while (true) {
        //  Write three messages, each with an envelope and content
        publisher.send(zmq::str_buffer("A"), zmq::send_flags::sndmore);
        publisher.send(zmq::str_buffer("Message in A envelope"));
        publisher.send(zmq::str_buffer("B"), zmq::send_flags::sndmore);
        publisher.send(zmq::str_buffer("Message in B envelope"));
        publisher.send(zmq::str_buffer("C"), zmq::send_flags::sndmore);
        publisher.send(zmq::str_buffer("Message in C envelope"));
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

void SubscriberThread1(zmq::context_t *ctx) {
    //  Prepare subscriber
    zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
    subscriber.connect("inproc://#1");

    //  Thread2 opens "A" and "B" envelopes
    subscriber.set(zmq::sockopt::subscribe, "A");
    subscriber.set(zmq::sockopt::subscribe, "B");

    while (1) {
        // Receive all parts of the message
        std::vector<zmq::message_t> recv_msgs;
        zmq::recv_result_t result =
            zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
        assert(result && "recv failed");
        assert(*result == 2);

        std::lock_guard<std::mutex> lock(s_mutex);
        std::cout << "Thread2: [" << recv_msgs[0].to_string() << "] "
            << recv_msgs[1].to_string() << std::endl;
    }
}

void SubscriberThread2(zmq::context_t *ctx) {
    //  Prepare our context and subscriber
    zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
    subscriber.connect("inproc://#1");

    //  Thread3 opens ALL envelopes
    subscriber.set(zmq::sockopt::subscribe, "");

    while (1) {
        // Receive all parts of the message
        std::vector<zmq::message_t> recv_msgs;
        zmq::recv_result_t result =
            zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
        assert(result && "recv failed");
        assert(*result == 2);

        std::lock_guard<std::mutex> lock(s_mutex);
        std::cout << "Thread3: [" << recv_msgs[0].to_string() << "] "
            << recv_msgs[1].to_string() << std::endl;
    }
}

int main() {
    zmq::context_t ctx(0);

    auto thread1 = std::thread(PublisherThread, &ctx);

    // Give the publisher a chance to bind, since inproc requires it
    std::this_thread::sleep_for(std::chrono::milliseconds(10));

    auto thread2 = std::thread(SubscriberThread1, &ctx);
    auto thread3 = std::thread(SubscriberThread2, &ctx);

    thread1.join();
    thread2.join();
    thread3.join();

    /*
     * Output:
     *   An infinite loop of a mix of:
     *     Thread2: [A] Message in A envelope
     *     Thread2: [B] Message in B envelope
     *     Thread3: [A] Message in A envelope
     *     Thread3: [B] Message in B envelope
     *     Thread3: [C] Message in C envelope
     */
}