夏天的禁卫军 發表於 2020-12-22 17:12:00

GO语言mqtt使用(例子)

<h1 id="go语言使用mqtt">GO语言使用mqtt</h1>
<pre><code>"github.com/eclipse/paho.mqtt.golang"
//mqtt

"github.com/sirupsen/logrus"
//日志
</code></pre>
<pre><code>package mqtt

import (
        "crypto/tls"
        "encoding/json"
        "errors"
        "os"
        "os/signal"
        "time"
        "fmt"

        MQTT "github.com/eclipse/paho.mqtt.golang"
        log "github.com/sirupsen/logrus"
)

var MqttAgent *Agent

//var AgentMqtt *Agent
var MQTTconfig *config.Config

const (
        MQTT_QOS=1
        MQTTBroker="tcp://xxx.xxx.xxx.xxx:1883"
        MQTTClientId="00000001"
)

// Agent runs an mqtt client
type Agent struct {
        client   MQTT.Client
}

type subscription struct {
        topic   string
        handler MQTT.MessageHandler
}


// NewAgent creates an agent
func NewAgent() ( *Agent) {
       
        a:=new(Agent)
        //opts:= MQTT.NewClientOptions().AddBroker(MQTTconfig.MQTTBroker).SetClientID(MQTTconfig.MQTTClientId)
        opts:= MQTT.NewClientOptions().AddBroker(MQTTBroker).SetClientID(MQTTClientId)
        //opts.SetUsername(MQTTconfig.MQTTUser)
        //opts.SetPassword(MQTTconfig.MQTTPasswd)
        opts.SetKeepAlive(5 * time.Second)
        opts.SetPingTimeout(5 * time.Second)
        opts.SetTLSConfig(&amp;tls.Config{
                ClientAuth:         tls.NoClientCert,
                ClientCAs:          nil,
                InsecureSkipVerify: true,
        })
       
        opts.OnConnectionLost = func(c MQTT.Client, err error) {
                log.WithField("error", err).Info("Lost connection")
        }
       
        a.client= MQTT.NewClient(opts)
       
        a.Connect()
        // mqtt.DEBUG = log.New(os.Stdout, "", 0)
        // mqtt.ERROR = log.New(os.Stdout, "", 0)
       
        return a
}


func (a *Agent)RegisterMqttHandler(topic string,handler infra.HandlerFunc){
        if token := a.client.Subscribe(topic,MQTT_QOS,
                func (c MQTT.Client,msg MQTT.Message){
                        var msgmqtt infra.Entity
                        var respentity *infra.RespEntity
                        var ic infra.IComponent
                        var msgdate infra.Message
                        //unmarshal the bgp json, execute action and return status
                        for true {
                                LogOutResp(topic,msg.Payload())
                                if err := json.Unmarshal(msg.Payload(), &amp;msgmqtt); err != nil {
                                        log.Error("RegisterMqttHandler Unmarshal fail")
                                        break
                                }
                                //Syslog.LogOutObj(msg.Topic(), msgBgp, Syslog.RESP_OK)
                                msgdate.Entity=&amp;msgmqtt
                                respentity=handler(ic,topic,&amp;msgdate)

                                break
                        }
                        if respentity!=nil{
                                //Publish resp to server
                                resp_topic :=topic+"/resp"
                                a.SendMqttResp(resp_topic,respentity)
                        }
                       
                }); token.Wait() &amp;&amp; token.Error() != nil {
                        log.WithField("error", token.Error()).Error("Can't subscribe")
                        os.Exit(1)
                }
               
}


// Connect opens a new connection
func (a *Agent) Connect() (err error) {
        token := a.client.Connect()
        if token.WaitTimeout(2*time.Second) == false {
                return errors.New("Open timeout")
        }
        if token.Error() != nil {
                return token.Error()
        }

        go func() {
                done := make(chan os.Signal)
                signal.Notify(done, os.Interrupt)
                &lt;-done
                log.Info("Shutting down agent")
                a.Close()
        }()

        return
}

// Close agent
func (a *Agent) Close() {
        a.client.Disconnect(250)
}

func (a *Agent)SendMqttResp(topic string,msg *infra.RespEntity) {
       
        msgResp := &amp;infra.RespEntity{
                Rid:       msg.Rid,
                Timestamp: uint64(time.Now().Unix()),
                Object:    msg.Object,
                Action:    msg.Action,
                Code:      msg.Code,
                Data:      msg.Data,
        }
        payload, err := json.Marshal(msgResp)
        log.Info(topic)
        log.Info(payload)
        if err != nil {
                log.WithFields(log.Fields{
                        "topic": topic,
                        "rid":   msg.Rid,
                        "obj":   msg.Object,
                        "act":   msg.Action,
                        "code":msg.Code,
                        "date":msg.Data,}).Info("SendMqttResp: Marshal failed")
                return
        }
        a.Publish(topic, true, string(payload))
}

// Publish things
func (a *Agent) Publish(topic string, retain bool, payload string) (err error) {
        token := a.client.Publish(topic, MQTT_QOS, retain, payload)
        if token.WaitTimeout(2*time.Second) == false {
                return errors.New("Publish timout")
        }
        if token.Error() != nil {
                return token.Error()
        }
        return nil
}


func LogOutResp(topic string, msg []byte) {
        msg_resp := "Response"
        msg_val := fmt.Sprintf("Topic=%s Payload=%s", topic, msg)
        log.WithField(msg_resp, msg_val).Info(topic)
}

</code></pre>
<pre><code>/*
* Copyright (c) 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
*    Seth Hoenig
*    Allan Stockdill-Mander
*    Mike Robertson
*/

/*----------------------------------------------------------------------
This sample is designed to demonstrate the ability to set individual
callbacks on a per-subscription basis. There are three handlers in use:
brokerLoadHandler -      $SYS/broker/load/#
brokerConnectionHandler -$SYS/broker/connection/#
brokerClientHandler -      $SYS/broker/clients/#
The client will receive 100 messages total from those subscriptions,
and then print the total number of messages received from each.
It may take a few moments for the sample to complete running, as it
must wait for messages to be published.
-----------------------------------------------------------------------*/

package main

import (
        "fmt"
        "os"

        MQTT "github.com/eclipse/paho.mqtt.golang"
)

var brokerLoad = make(chan bool)
var brokerConnection = make(chan bool)
var brokerClients = make(chan bool)

func brokerLoadHandler(client MQTT.Client, msg MQTT.Message) {
        brokerLoad &lt;- true
        fmt.Printf("BrokerLoadHandler         ")
        fmt.Printf("[%s]", msg.Topic())
        fmt.Printf("%s\n", msg.Payload())
}

func brokerConnectionHandler(client MQTT.Client, msg MQTT.Message) {
        brokerConnection &lt;- true
        fmt.Printf("BrokerConnectionHandler   ")
        fmt.Printf("[%s]", msg.Topic())
        fmt.Printf("%s\n", msg.Payload())
}

func brokerClientsHandler(client MQTT.Client, msg MQTT.Message) {
        brokerClients &lt;- true
        fmt.Printf("BrokerClientsHandler      ")
        fmt.Printf("[%s]", msg.Topic())
        fmt.Printf("%s\n", msg.Payload())
}

func main() {
        opts := MQTT.NewClientOptions().AddBroker("tcp://iot.eclipse.org:1883").SetClientID("router-sample")
        opts.SetCleanSession(true)

        c := MQTT.NewClient(opts)
        if token := c.Connect(); token.Wait() &amp;&amp; token.Error() != nil {
                panic(token.Error())
        }

        if token := c.Subscribe("$SYS/broker/load/#", 0, brokerLoadHandler); token.Wait() &amp;&amp; token.Error() != nil {
                fmt.Println(token.Error())
                os.Exit(1)
        }

        if token := c.Subscribe("$SYS/broker/connection/#", 0, brokerConnectionHandler); token.Wait() &amp;&amp; token.Error() != nil {
                fmt.Println(token.Error())
                os.Exit(1)
        }

        if token := c.Subscribe("$SYS/broker/clients/#", 0, brokerClientsHandler); token.Wait() &amp;&amp; token.Error() != nil {
                fmt.Println(token.Error())
                os.Exit(1)
        }

        loadCount := 0
        connectionCount := 0
        clientsCount := 0

        for i := 0; i &lt; 100; i++ {
                select {
                case &lt;-brokerLoad:
                        loadCount++
                case &lt;-brokerConnection:
                        connectionCount++
                case &lt;-brokerClients:
                        clientsCount++
                }
        }

        fmt.Printf("Received %3d Broker Load messages\n", loadCount)
        fmt.Printf("Received %3d Broker Connection messages\n", connectionCount)
        fmt.Printf("Received %3d Broker Clients messages\n", clientsCount)

        c.Disconnect(250)
}
</code></pre><br><br>
来源:https://www.cnblogs.com/CYD-self/p/14174133.html
頁: [1]
查看完整版本: GO语言mqtt使用(例子)