当前位置:网站首页> golang mqtt/stomp/nats/amqp

golang mqtt/stomp/nats/amqp

2022-07-06 00:36:00 Golang中文社区

# [xutils/mq](https://github.com/xaces/xutils/tree/master/mq)## 功能- 统一接口,初始化时选择不同的协议适配器,- 单独使用指定协议- 创建指定个数的连接,推送、订阅时动态均衡分配## 测试代码```gofunc TestMqtt(t *testing.T) { c, err := NewPublish(&Options{Address: "127.0.0.1:35003", Goc: 5}, NewMqtt) if err != nil { c.Shutdown() log.Fatalln(err) } topic := "test/mqtt" c.Subscribe(topic, func(b []byte) error { log.Printf("%s: %s\n", topic, b) return nil }) for { time.Sleep(2 * time.Second) c.Publish(topic, map[string]interface{}{ "device": "20198002", "now": time.Now().Format("2006-01-02 15:04:05"), }) }}func TestStomp(t *testing.T) { c, err := NewPublish(&Options{Address: "127.0.0.1:35002", Goc: 1}, NewStomp) if err != nil { c.Shutdown() log.Fatalln(err) } subject := "/queue/test/stomp" c.Subscribe(subject, func(b []byte) error { log.Printf("%s:1 %s\n", subject, b) return nil }) c.Subscribe(subject, func(b []byte) error { log.Printf("%s:2 %s\n", subject, b) return nil }) for { time.Sleep(2 * time.Second) c.Publish(subject, map[string]interface{}{ "device": "20198002", "now": time.Now().Format("2006-01-02 15:04:05"), }) }}func TestNats(t *testing.T) { c, err := NewPublish(&Options{Address: NatsURL, Goc: 1}, NewStomp) if err != nil { c.Shutdown() log.Fatalln(err) } subject := "/queue/test/nats" c.Subscribe(subject, func(b []byte) error { log.Printf("%s:1 %s\n", subject, b) return nil }) c.Subscribe(subject, func(b []byte) error { log.Printf("%s:2 %s\n", subject, b) return nil }) for { time.Sleep(2 * time.Second) c.Publish(subject, map[string]interface{}{ "device": "20198002", "now": time.Now().Format("2006-01-02 15:04:05"), }) }}```
原网站

版权声明
本文为[Golang中文社区]所创,转载请带上原文链接,感谢
https://studygolang.com/articles/35742