这里我用的是github.com/Azure/go-amqp这个库,它支持AMQP 1.0协议,是纯go语言实现的。qpid用的是1.39.0版本直接启动的话它是不支持AMQP 1.0协议的,需要加载amqp.so库(加载方法)。
下面演示使用sender和receiver的简单例子。
receiver
接收队列examples的消息直接丢弃:go run main.go -queue examples
接收队列examples的10个消息:go run main.go -queue examples -count 10
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/Azure/go-amqp"
)
const ADDR = "amqp://118.24.114.114"
func main() {
queueName := flag.String("queue", "", "queue name")
receiveCount := flag.Int("count", 0, "limit receive count")
flag.Parse()
if *queueName == "" {
log.Fatalln("Error queue is empty")
}
client, err := amqp.Dial(ADDR)
if err != nil {
log.Fatalln("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Create a receiver
receiver, err := session.NewReceiver(
amqp.LinkSourceAddress(*queueName),
amqp.LinkCredit(10),
)
if err != nil {
log.Fatal("Creating receiver link:", err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
receiver.Close(ctx)
cancel()
}()
count := 0
for {
if *receiveCount > 0 && count >= *receiveCount {
fmt.Println("receive finished count:", count)
return
}
count += 1
// Receive next message
msg, err := receiver.Receive(ctx)
if err != nil {
log.Fatal("Reading message from AMQP:", err)
}
// Accept message
msg.Accept()
fmt.Printf("\rreceive:%d", count)
}
fmt.Println("receive finished!")
}
sender
向队列examples发送消息:go run main.go -queue examples -msg hello
向队列examples发送消息重复10次:go run main.go -queue examples -msg hello -repeat 10
向队列examples发送消息指定subject:go run main.go -queue examples -msg hello -subject news
package main
import (
"context"
"flag"
"fmt"
"log"
"github.com/Azure/go-amqp"
)
const ADDR = "amqp://118.24.114.114"
func main() {
queueName := flag.String("queue", "", "queue name")
subject := flag.String("subject", "", "msg subject")
message := flag.String("msg", "", "send message")
repeat := flag.Int("repeat", 1, "send repeat count")
flag.Parse()
if *queueName == "" {
log.Fatalln("Error queue is empty")
}
if *message == "" {
log.Fatalln("Error send message is empty")
}
client, err := amqp.Dial(ADDR)
if err != nil {
log.Fatalln("Dialing AMQP server:", err)
}
defer client.Close()
// Open a session
session, err := client.NewSession()
if err != nil {
log.Fatal("Creating AMQP session:", err)
}
ctx := context.Background()
// Create a sender
sender, err := session.NewSender(
amqp.LinkTargetAddress(*queueName),
)
if err != nil {
log.Fatal("Creating sender link:", err)
}
//ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// Send message
count := 0
for i := 0; i < *repeat; i++ {
msg := amqp.NewMessage([]byte(*message))
if *subject != "" {
msg.Properties = &amqp.MessageProperties{}
msg.Properties.Subject = *subject
}
err = sender.Send(ctx, msg)
if err != nil {
log.Fatal("Sending message:", err)
}
count += 1
fmt.Printf("\r%d", count)
}
sender.Close(ctx)
//cancel()
fmt.Println("\nsend finished")
}
请求应答
发送一个请求,然后通过一个动态地址接收应答
// handler.go
package handler
import (
"context"
"fmt"
"github.com/Azure/go-amqp"
"log"
"sync"
"time"
)
type TimeoutWait struct {
wg sync.WaitGroup
done chan struct{}
timeout time.Duration
}
func NewTimeoutWait(timeout time.Duration)*TimeoutWait {
return &TimeoutWait{
wg: sync.WaitGroup{},
done: make(chan struct{}),
timeout: timeout,
}
}
func(tw *TimeoutWait)Add(delta int) {
tw.wg.Add(delta)
}
func(tw *TimeoutWait)Done() {
tw.wg.Done()
}
func(tw *TimeoutWait)Wait() {
go func() {
tw.wg.Wait()
close(tw.done)
}()
select {
case <- tw.done:
case <-time.After(tw.timeout):
}
}
type handler struct {
addr string
client *amqp.Client
session *amqp.Session
ctx context.Context
senders map[string]*amqp.Sender
mux sync.Mutex
exit *TimeoutWait
}
type OnMessage func(message *amqp.Message, err error)
func New(addr string) *handler {
client, err := amqp.Dial(addr)
if err != nil {
log.Fatal(err)
}
session, err := client.NewSession()
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
senders := make(map[string]*amqp.Sender)
return &handler{
addr: addr,
client: client,
session: session,
ctx: ctx,
senders:senders,
exit: NewTimeoutWait(5 * time.Second),
}
}
func(h *handler)Close() {
if h == nil {
return
}
fmt.Println("close start")
h.exit.Wait()
for _,v := range h.senders {
v.Close(h.ctx)
}
h.senders = nil
h.session.Close(h.ctx)
fmt.Println("close end")
}
func(h *handler)getSender(address string) (*amqp.Sender, error) {
h.mux.Lock()
defer h.mux.Unlock()
if sender, ok := h.senders[address]; ok {
return sender, nil
}
sender, err := h.session.NewSender(amqp.LinkTargetAddress(address))
if err != nil {
return nil, err
}
h.senders[address] = sender
return sender, nil
}
func(h *handler)Send(queue string, msg *amqp.Message, onMessage OnMessage) error {
receiver, err := h.session.NewReceiver(amqp.LinkSourceAddress(""), amqp.LinkAddressDynamic())
if err != nil {
return err
}
msg.Properties = &amqp.MessageProperties{ReplyTo: receiver.Address()}
sender, err := h.getSender(queue)
if err != nil {
return err
}
go func() {
h.exit.Add(1)
defer h.exit.Done()
fmt.Println("receiver start")
ctx, cancel := context.WithTimeout(h.ctx, 3 * time.Second)
defer func() {
fmt.Println("receiver end")
receiver.Close(h.ctx)
cancel()
}()
rspMsg, err := receiver.Receive(ctx)
onMessage(rspMsg, err)
}()
return sender.Send(h.ctx, msg)
}
// main.go
package main
import (
"fmt"
"github.com/Azure/go-amqp"
"gotest/handler"
"log"
)
func main() {
handler := handler.New("amqp://118.24.114.114")
defer handler.Close()
if handler == nil {
panic("handler create failed")
}
msg := amqp.NewMessage([]byte("hello"))
err := handler.Send("acc_queue_echo", msg, func(rspMsg *amqp.Message, err error) {
if err != nil {
log.Fatal(err)
}
fmt.Println("received message:", string(rspMsg.GetData()))
})
if err != nil {
panic(err)
}
fmt.Println("success")
}