Skip to content

golang与qpid broker通信

Published: at 01:33 AM | 5 min read

这里我用的是github.com/Azure/go-amqp这个库,它支持AMQP 1.0协议,是纯go语言实现的。qpid用的是1.39.0版本直接启动的话它是不支持AMQP 1.0协议的,需要加载amqp.so库(加载方法)。

下面演示使用sender和receiver的简单例子。

receiver

接收队列examples的消息直接丢弃:go run main.go -queue examples
接收队列examples的10个消息:go run main.go -queue examples -count 10

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"time"

	"github.com/Azure/go-amqp"
)
 const ADDR = "amqp://118.24.114.114"
func main() {
	queueName := flag.String("queue", "", "queue name")
	receiveCount := flag.Int("count", 0, "limit receive count")
	flag.Parse()

	if *queueName == "" {
		log.Fatalln("Error queue is empty")
	}

	client, err := amqp.Dial(ADDR)
	if err != nil {
		log.Fatalln("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	ctx := context.Background()
	// Create a receiver
	receiver, err := session.NewReceiver(
		amqp.LinkSourceAddress(*queueName),
		amqp.LinkCredit(10),
	)
	if err != nil {
		log.Fatal("Creating receiver link:", err)
	}
	defer func() {
		ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
		receiver.Close(ctx)
		cancel()
	}()

	count := 0
	for {
		if *receiveCount > 0 && count >= *receiveCount {
			fmt.Println("receive finished count:", count)
			return
		}

		count += 1
		// Receive next message
		msg, err := receiver.Receive(ctx)
		if err != nil {
			log.Fatal("Reading message from AMQP:", err)
		}

		// Accept message
		msg.Accept()
		fmt.Printf("\rreceive:%d", count)
	}
	fmt.Println("receive finished!")
}

sender

向队列examples发送消息:go run main.go -queue examples -msg hello
向队列examples发送消息重复10次:go run main.go -queue examples -msg hello -repeat 10
向队列examples发送消息指定subject:go run main.go -queue examples -msg hello -subject news

package main

import (
	"context"
	"flag"
	"fmt"
	"log"

	"github.com/Azure/go-amqp"
)

const ADDR = "amqp://118.24.114.114"
func main() {
	queueName := flag.String("queue", "", "queue name")
	subject := flag.String("subject", "", "msg subject")
	message := flag.String("msg", "", "send message")
	repeat := flag.Int("repeat", 1, "send repeat count")
	flag.Parse()

	if *queueName == "" {
		log.Fatalln("Error queue is empty")
	}
	if *message == "" {
		log.Fatalln("Error send message is empty")
	}

	client, err := amqp.Dial(ADDR)
	if err != nil {
		log.Fatalln("Dialing AMQP server:", err)
	}
	defer client.Close()

	// Open a session
	session, err := client.NewSession()
	if err != nil {
		log.Fatal("Creating AMQP session:", err)
	}

	ctx := context.Background()
	// Create a sender
	sender, err := session.NewSender(
		amqp.LinkTargetAddress(*queueName),
	)
	if err != nil {
		log.Fatal("Creating sender link:", err)
	}

	//ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	// Send message
	count := 0
	for i := 0; i < *repeat; i++ {
		msg := amqp.NewMessage([]byte(*message))
		if *subject != "" {
			msg.Properties = &amqp.MessageProperties{}
			msg.Properties.Subject = *subject
		}
		err = sender.Send(ctx, msg)
		if err != nil {
			log.Fatal("Sending message:", err)
		}
		count += 1
		fmt.Printf("\r%d", count)
	}

	sender.Close(ctx)
	//cancel()
	fmt.Println("\nsend finished")
}

请求应答

发送一个请求,然后通过一个动态地址接收应答

// handler.go
package handler

import (
	"context"
	"fmt"
	"github.com/Azure/go-amqp"
	"log"
	"sync"
	"time"
)

type TimeoutWait struct {
	wg sync.WaitGroup
	done chan struct{}
	timeout time.Duration
}

func NewTimeoutWait(timeout time.Duration)*TimeoutWait {
	return &TimeoutWait{
		wg: sync.WaitGroup{},
		done: make(chan struct{}),
		timeout: timeout,
	}
}

func(tw *TimeoutWait)Add(delta int) {
	tw.wg.Add(delta)
}

func(tw *TimeoutWait)Done() {
	tw.wg.Done()
}

func(tw *TimeoutWait)Wait() {
	go func() {
		tw.wg.Wait()
		close(tw.done)
	}()
	select {
	case <- tw.done:
	case <-time.After(tw.timeout):
	}
}

type handler struct {
	addr string
	client *amqp.Client
	session *amqp.Session
	ctx context.Context
	senders map[string]*amqp.Sender
	mux sync.Mutex
	exit *TimeoutWait
}

type OnMessage func(message *amqp.Message, err error)

func New(addr string) *handler {
	client, err := amqp.Dial(addr)
	if err != nil {
		log.Fatal(err)
	}

	session, err := client.NewSession()
	if err != nil {
		log.Fatal(err)
	}

	ctx := context.Background()
	senders := make(map[string]*amqp.Sender)
	return &handler{
		addr: addr,
		client: client,
		session: session,
		ctx: ctx,
		senders:senders,
		exit: NewTimeoutWait(5 * time.Second),
	}
}

func(h *handler)Close() {
	if h == nil {
		return
	}

	fmt.Println("close start")
	h.exit.Wait()

	for _,v := range h.senders {
		v.Close(h.ctx)
	}
	h.senders = nil
	h.session.Close(h.ctx)
	fmt.Println("close end")
}

func(h *handler)getSender(address string) (*amqp.Sender, error) {
	h.mux.Lock()
	defer h.mux.Unlock()
	if sender, ok := h.senders[address]; ok {
		return sender, nil
	}
	sender, err := h.session.NewSender(amqp.LinkTargetAddress(address))
	if err != nil {
		return nil, err
	}
	h.senders[address] = sender
	return sender, nil
}

func(h *handler)Send(queue string, msg *amqp.Message, onMessage OnMessage) error {
	receiver, err := h.session.NewReceiver(amqp.LinkSourceAddress(""), amqp.LinkAddressDynamic())
	if err != nil {
		return err
	}

	msg.Properties = &amqp.MessageProperties{ReplyTo: receiver.Address()}
	sender, err := h.getSender(queue)
	if err != nil {
		return err
	}

	go func() {
		h.exit.Add(1)
		defer h.exit.Done()

		fmt.Println("receiver start")
		ctx, cancel := context.WithTimeout(h.ctx, 3 * time.Second)
		defer func() {
			fmt.Println("receiver end")
			receiver.Close(h.ctx)
			cancel()
		}()
		rspMsg, err := receiver.Receive(ctx)
		onMessage(rspMsg, err)
	}()

	return sender.Send(h.ctx, msg)
}

// main.go
package main

import (
	"fmt"
	"github.com/Azure/go-amqp"
	"gotest/handler"
	"log"
)

func main() {
	handler := handler.New("amqp://118.24.114.114")
	defer handler.Close()
	if handler == nil {
		panic("handler create failed")
	}
	msg := amqp.NewMessage([]byte("hello"))
	err := handler.Send("acc_queue_echo", msg, func(rspMsg *amqp.Message, err error) {
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println("received message:", string(rspMsg.GetData()))
	})
	if err != nil {
		panic(err)
	}
	fmt.Println("success")
}