remote-msg/internal/service/mqtt_client.go

115 lines
2.6 KiB
Go

package service
import (
"fmt"
"github.com/gogf/gf/v2/util/guid"
"strings"
"log/slog"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const C_QOS_0 = 0
const C_QOS_1 = 1
const C_QOS_2 = 2
type OnConnectedHandler func(*MqttClient)
type MqttClient struct {
Broker string
ClientId string
UserName string
PassWord string
client mqtt.Client
tokens []mqtt.Token
OnConnected OnConnectedHandler
}
func (obj *MqttClient) GetClient() mqtt.Client {
return obj.client
}
/**
* 连接
*/
func (obj *MqttClient) Connect() (err error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(obj.Broker)
if len(strings.TrimSpace(obj.ClientId)) == 0 {
obj.ClientId = "MqttClient" + guid.S()
}
opts.SetClientID(obj.ClientId)
opts.SetUsername(obj.UserName)
opts.SetPassword(obj.PassWord)
opts.OnConnect = obj.connectHandler
opts.OnConnectionLost = obj.connectLostHandler
opts.SetDefaultPublishHandler(obj.messagePubHandler)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
}
obj.client = client
return nil
}
func (obj *MqttClient) connectHandler(client mqtt.Client) {
//obj.client = client
slog.Info("Connected", "clientid", obj.ClientId, "Broker", obj.Broker)
if obj.OnConnected != nil {
obj.OnConnected(obj)
}
}
func (obj *MqttClient) connectLostHandler(client mqtt.Client, err error) {
slog.Error("Connect lost: "+err.Error(), "clientid", obj.ClientId, "Broker", obj.Broker)
}
func (obj *MqttClient) messagePubHandler(client mqtt.Client, msg mqtt.Message) {
slog.Info(fmt.Sprintf("publish message: %s from topic: %s\n", msg.Payload(), msg.Topic()))
}
/**
* 订阅
**/
func (obj *MqttClient) Subscribe(topic string, callback mqtt.MessageHandler) error {
slog.Info(fmt.Sprintf("Subscribe topic: %s\n", topic))
token := obj.client.Subscribe(topic, C_QOS_0, callback)
//obj.tokens = append(obj.tokens, token)
token.Wait()
return token.Error()
}
func (obj *MqttClient) UnSubscribe(topic string) {
obj.client.Unsubscribe(topic)
}
func (obj *MqttClient) Close() {
obj.client.Disconnect(uint(50))
slog.Info(fmt.Sprintf("Connect close: %s on %s ", obj.ClientId, obj.Broker))
}
func (obj *MqttClient) Publish(topic string, retained bool, payload string) (err error) {
slog.Info(fmt.Sprintf(">>>>>>Publish %s, message: %s\n", topic, payload))
//===================================
//设置消息发送间隔
//===================================
token := obj.client.Publish(topic, C_QOS_0, retained, payload)
token.Wait()
err = token.Error()
if err != nil {
slog.Error(fmt.Sprintf("Publish %s error\n", topic), "error", err)
}
return err
}