文章标题 原创 翻译 转载 文章内容 这里我用的是github.com/Azure/go-amqp这个库,它支持AMQP 1.0协议,是纯go语言实现的。qpid用的是1.39.0版本直接启动的话它是不支持AMQP 1.0协议的,需要加载amqp.so库([加载方法](https://www.ningto.com/post/5ea109bb707b535469eb754c))。 下面演示使用sender和receiver的简单例子。 # receiver 接收队列examples的消息直接丢弃:go run main.go -queue examples 接收队列examples的10个消息:go run main.go -queue examples -count 10 ``` package main import ( "context" "flag" "fmt" "log" "time" "github.com/Azure/go-amqp" ) const ADDR = "amqp://118.24.114.114" func main() { queueName := flag.String("queue", "", "queue name") receiveCount := flag.Int("count", 0, "limit receive count") flag.Parse() if *queueName == "" { log.Fatalln("Error queue is empty") } client, err := amqp.Dial(ADDR) if err != nil { log.Fatalln("Dialing AMQP server:", err) } defer client.Close() // Open a session session, err := client.NewSession() if err != nil { log.Fatal("Creating AMQP session:", err) } ctx := context.Background() // Create a receiver receiver, err := session.NewReceiver( amqp.LinkSourceAddress(*queueName), amqp.LinkCredit(10), ) if err != nil { log.Fatal("Creating receiver link:", err) } defer func() { ctx, cancel := context.WithTimeout(ctx, 1*time.Second) receiver.Close(ctx) cancel() }() count := 0 for { if *receiveCount > 0 && count >= *receiveCount { fmt.Println("receive finished count:", count) return } count += 1 // Receive next message msg, err := receiver.Receive(ctx) if err != nil { log.Fatal("Reading message from AMQP:", err) } // Accept message msg.Accept() fmt.Printf("\rreceive:%d", count) } fmt.Println("receive finished!") } ``` # sender 向队列examples发送消息:go run main.go -queue examples -msg hello 向队列examples发送消息重复10次:go run main.go -queue examples -msg hello -repeat 10 向队列examples发送消息指定subject:go run main.go -queue examples -msg hello -subject news ``` package main import ( "context" "flag" "fmt" "log" "github.com/Azure/go-amqp" ) const ADDR = "amqp://118.24.114.114" func main() { queueName := flag.String("queue", "", "queue name") subject := flag.String("subject", "", "msg subject") message := flag.String("msg", "", "send message") repeat := flag.Int("repeat", 1, "send repeat count") flag.Parse() if *queueName == "" { log.Fatalln("Error queue is empty") } if *message == "" { log.Fatalln("Error send message is empty") } client, err := amqp.Dial(ADDR) if err != nil { log.Fatalln("Dialing AMQP server:", err) } defer client.Close() // Open a session session, err := client.NewSession() if err != nil { log.Fatal("Creating AMQP session:", err) } ctx := context.Background() // Create a sender sender, err := session.NewSender( amqp.LinkTargetAddress(*queueName), ) if err != nil { log.Fatal("Creating sender link:", err) } //ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // Send message count := 0 for i := 0; i < *repeat; i++ { msg := amqp.NewMessage([]byte(*message)) if *subject != "" { msg.Properties = &amqp.MessageProperties{} msg.Properties.Subject = *subject } err = sender.Send(ctx, msg) if err != nil { log.Fatal("Sending message:", err) } count += 1 fmt.Printf("\r%d", count) } sender.Close(ctx) //cancel() fmt.Println("\nsend finished") } ``` # 请求应答 发送一个请求,然后通过一个动态地址接收应答 ``` // handler.go package handler import ( "context" "fmt" "github.com/Azure/go-amqp" "log" "sync" "time" ) type TimeoutWait struct { wg sync.WaitGroup done chan struct{} timeout time.Duration } func NewTimeoutWait(timeout time.Duration)*TimeoutWait { return &TimeoutWait{ wg: sync.WaitGroup{}, done: make(chan struct{}), timeout: timeout, } } func(tw *TimeoutWait)Add(delta int) { tw.wg.Add(delta) } func(tw *TimeoutWait)Done() { tw.wg.Done() } func(tw *TimeoutWait)Wait() { go func() { tw.wg.Wait() close(tw.done) }() select { case <- tw.done: case <-time.After(tw.timeout): } } type handler struct { addr string client *amqp.Client session *amqp.Session ctx context.Context senders map[string]*amqp.Sender mux sync.Mutex exit *TimeoutWait } type OnMessage func(message *amqp.Message, err error) func New(addr string) *handler { client, err := amqp.Dial(addr) if err != nil { log.Fatal(err) } session, err := client.NewSession() if err != nil { log.Fatal(err) } ctx := context.Background() senders := make(map[string]*amqp.Sender) return &handler{ addr: addr, client: client, session: session, ctx: ctx, senders:senders, exit: NewTimeoutWait(5 * time.Second), } } func(h *handler)Close() { if h == nil { return } fmt.Println("close start") h.exit.Wait() for _,v := range h.senders { v.Close(h.ctx) } h.senders = nil h.session.Close(h.ctx) fmt.Println("close end") } func(h *handler)getSender(address string) (*amqp.Sender, error) { h.mux.Lock() defer h.mux.Unlock() if sender, ok := h.senders[address]; ok { return sender, nil } sender, err := h.session.NewSender(amqp.LinkTargetAddress(address)) if err != nil { return nil, err } h.senders[address] = sender return sender, nil } func(h *handler)Send(queue string, msg *amqp.Message, onMessage OnMessage) error { receiver, err := h.session.NewReceiver(amqp.LinkSourceAddress(""), amqp.LinkAddressDynamic()) if err != nil { return err } msg.Properties = &amqp.MessageProperties{ReplyTo: receiver.Address()} sender, err := h.getSender(queue) if err != nil { return err } go func() { h.exit.Add(1) defer h.exit.Done() fmt.Println("receiver start") ctx, cancel := context.WithTimeout(h.ctx, 3 * time.Second) defer func() { fmt.Println("receiver end") receiver.Close(h.ctx) cancel() }() rspMsg, err := receiver.Receive(ctx) onMessage(rspMsg, err) }() return sender.Send(h.ctx, msg) } ``` ``` // main.go package main import ( "fmt" "github.com/Azure/go-amqp" "gotest/handler" "log" ) func main() { handler := handler.New("amqp://118.24.114.114") defer handler.Close() if handler == nil { panic("handler create failed") } msg := amqp.NewMessage([]byte("hello")) err := handler.Send("acc_queue_echo", msg, func(rspMsg *amqp.Message, err error) { if err != nil { log.Fatal(err) } fmt.Println("received message:", string(rspMsg.GetData())) }) if err != nil { panic(err) } fmt.Println("success") } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交