From 774a24dc80c6f0b6e595ed009daae505500d973a Mon Sep 17 00:00:00 2001 From: 0016 <0016@drgyen.cn> Date: Wed, 21 Aug 2024 16:55:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=96=B0=E5=A2=9E=E7=83=9F=E9=9B=BE?= =?UTF-8?q?=E4=BC=A0=E6=84=9F=E5=99=A8=E6=95=B0=E6=8D=AE=E5=92=8C=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E4=B8=8A=E6=8A=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/hello/v1/hello.go | 1 + go.mod | 14 +- go.sum | 20 ++- internal/cmd/cmd.go | 28 +++- internal/controller/hello/hello_v1_hello.go | 172 +++++++++++++++++++- internal/model/pullDto.go | 113 +++++++++++++ internal/model/pushPowerDto.go | 22 +++ internal/service/mqtt_client.go | 114 +++++++++++++ manifest/config/config.yaml | 5 + 9 files changed, 471 insertions(+), 18 deletions(-) create mode 100644 internal/model/pullDto.go create mode 100644 internal/model/pushPowerDto.go create mode 100644 internal/service/mqtt_client.go diff --git a/api/hello/v1/hello.go b/api/hello/v1/hello.go index 01b72e1..32e5ab1 100644 --- a/api/hello/v1/hello.go +++ b/api/hello/v1/hello.go @@ -17,6 +17,7 @@ type PostReq struct { type PostRes struct { g.Meta `mime:"application/json" example:"string"` + Data string `json:"data"` } type GetReq struct { diff --git a/go.mod b/go.mod index ea765d4..8a06cdf 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module remote-msg go 1.18 -require github.com/gogf/gf/v2 v2.5.2 +require ( + github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/gogf/gf/v2 v2.5.2 +) require ( github.com/BurntSushi/toml v1.2.0 // indirect @@ -11,7 +14,7 @@ require ( github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/gorilla/websocket v1.5.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/grokify/html-strip-tags-go v0.0.1 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect @@ -22,8 +25,9 @@ require ( go.opentelemetry.io/otel v1.14.0 // indirect go.opentelemetry.io/otel/sdk v1.14.0 // indirect go.opentelemetry.io/otel/trace v1.14.0 // indirect - golang.org/x/net v0.12.0 // indirect - golang.org/x/sys v0.10.0 // indirect - golang.org/x/text v0.11.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/text v0.16.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 2003de5..e682339 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi github.com/clbanning/mxj/v2 v2.7.0 h1:WA/La7UGCanFe5NpHF0Q3DNtnCsVoxbPKuyBNHWRyME= github.com/clbanning/mxj/v2 v2.7.0/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -15,8 +17,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/gogf/gf/v2 v2.5.2 h1:fACJE7DJH6iTGHGhgiNY1uuZIZtr2IqQkJ52E+wBnt8= github.com/gogf/gf/v2 v2.5.2/go.mod h1:7yf5qp0BznfsYx7Sw49m3mQvBsHpwAjJk3Q9ZnKoUEc= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0= github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= @@ -42,15 +44,17 @@ go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvx go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM= go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= -golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50= -golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= -golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= -golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/cmd/cmd.go b/internal/cmd/cmd.go index 49536c9..4e4aa0c 100644 --- a/internal/cmd/cmd.go +++ b/internal/cmd/cmd.go @@ -2,11 +2,9 @@ package cmd import ( "context" - "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/os/gcmd" - "remote-msg/internal/controller/hello" ) @@ -28,3 +26,29 @@ var ( }, } ) + +func ctwingListener() { + + /*ctwingMqttClient.Subscribe(topic, func(client mqtt.Client, msg mqtt.Message) { + slog.Info(fmt.Sprintf("接收设备上报数据: %s from topic: %s\n", msg.Payload(), msg.Topic())) + var msgDto model.PostDataDto + err := json.Unmarshal(msg.Payload(), &msgDto) + if err != nil { + slog.Error("Unmarshal error: " + err.Error()) + return + } + go func() { + defer func() { + if r := recover(); r != nil { + slog.Error("meikaListener发生了panic:", r) + } + }() + err := SendDevMessage(msgDto) + if err != nil { + slog.Error(err.Error()) + } + }() + + })*/ + +} diff --git a/internal/controller/hello/hello_v1_hello.go b/internal/controller/hello/hello_v1_hello.go index bd11efd..5d6bcde 100644 --- a/internal/controller/hello/hello_v1_hello.go +++ b/internal/controller/hello/hello_v1_hello.go @@ -2,13 +2,56 @@ package hello import ( "context" + "encoding/json" "fmt" + "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" + "github.com/gogf/gf/v2/util/guid" + "remote-msg/internal/model" + "remote-msg/internal/service" + "sync" "remote-msg/api/hello/v1" ) +var ( + subscribeTopic string + publishTopic string +) + +type MqttInstance struct { + mqttClient *service.MqttClient +} + +var once sync.Once +var mqttInstance *MqttInstance + +func GetMqttInstance() *MqttInstance { + once.Do(func() { + mqttInstance = &MqttInstance{} + mqttInstance.MqttConnect() + }) + return mqttInstance +} + +func (c *MqttInstance) MqttConnect() { + c.mqttClient = &service.MqttClient{ + ClientId: "ctwingListener_" + guid.S(), + } + ctwingBroker, _ := g.Cfg().Get(context.Background(), "mqtt.ctwing.broker") + ctwingUsername, _ := g.Cfg().Get(context.Background(), "mqtt.ctwing.username") + ctwingPassword, _ := g.Cfg().Get(context.Background(), "mqtt.ctwing.password") + c.mqttClient.Broker = ctwingBroker.String() + c.mqttClient.UserName = ctwingUsername.String() + c.mqttClient.PassWord = ctwingPassword.String() + + err := c.mqttClient.Connect() + if err != nil { + panic("mqtt监听失败") + } +} + func (c *ControllerV1) Hello(ctx context.Context, req *v1.HelloReq) (res *v1.HelloRes, err error) { g.RequestFromCtx(ctx).Response.Writeln("Hello World!") return @@ -23,9 +66,132 @@ func (c *ControllerV1) Get(ctx context.Context, req *v1.GetReq) (res *v1.GetRes, } func (c *ControllerV1) Post(ctx context.Context, req *v1.PostReq) (res *v1.PostRes, err error) { + res = new(v1.PostRes) //获取post请求参数 body := g.RequestFromCtx(ctx).GetBody() - fmt.Println(gconv.String(body)) - g.RequestFromCtx(ctx).Response.Writeln("post success!") - return + var dataMap = make(map[string]interface{}) + err = json.Unmarshal(body, &dataMap) + if IMEI, ok := dataMap["IMEI"]; ok { + publishTopic = "/drgyen/" + gconv.String(IMEI) + "/up/" + } else if imei, ok := dataMap["imei"]; ok { + publishTopic = "/drgyen/" + gconv.String(imei) + "/up/" + } else { + g.Log().Error(ctx, "invalid device imei") + return + } + if dataMap["messageType"] == "dataReport" { + var postDataDto model.PostDataDto + err = json.Unmarshal(body, &postDataDto) + var pushData model.PostParamDto + pushData.Action = "post_P" + pushData.Timestamp = postDataDto.Timestamp + pushData.MsgId = gconv.String(postDataDto.Timestamp) + pushData.DeviceSN = gconv.String(postDataDto.IMEI) + pushData.Gateway = gconv.String(postDataDto.IMEI) + var postParamData model.PostParamData + postParamData.Data = gconv.String(postDataDto.Payload) + postParamData.Len = postDataDto.ServiceId + pushData.Data = append(pushData.Data, postParamData) + + err = PublishMessage(pushData) + if err != nil { + g.Log().Error(ctx, "publish message error", err) + return + } + writeStr := gconv.String(pushData) + res.Data = writeStr + return + + } else if dataMap["messageType"] == "eventReport" { + var eventPull model.EventPullDto + err = json.Unmarshal(body, &eventPull) + var pushData model.PostParamDto + pushData.Action = "post_E" + pushData.Timestamp = eventPull.Timestamp + pushData.MsgId = gconv.String(eventPull.Timestamp) + pushData.DeviceSN = gconv.String(eventPull.IMEI) + pushData.Gateway = gconv.String(eventPull.IMEI) + var postEventData model.PostEventData + postEventData.Timestamp = eventPull.Timestamp + eventDataMap := make(map[string]interface{}) + switch eventPull.ServiceId { + case 1001: + postEventData.EventCode = "SmokeAlarm" + var smokeAlarm model.SmokeAlarm + gconv.Struct(eventPull.EventContent, &smokeAlarm) + eventDataMap["state"] = smokeAlarm.SmokeState + eventDataMap["value"] = smokeAlarm.SmokeValue + break + case 1002: + postEventData.EventCode = "TemperatureAlarm" + var temperatureAlarm model.TemperatureAlarm + gconv.Struct(eventPull.EventContent, &temperatureAlarm) + eventDataMap["state"] = temperatureAlarm.TemperatureState + eventDataMap["value"] = temperatureAlarm.Temperature + break + case 1003: + postEventData.EventCode = "BatteryVoltageLowAlarm" + var temperatureAlarm model.BatteryVoltageLowAlarm + gconv.Struct(eventPull.EventContent, &temperatureAlarm) + eventDataMap["state"] = temperatureAlarm.BatteryState + eventDataMap["value"] = temperatureAlarm.BatteryVoltage + break + case 1004: + postEventData.EventCode = "TamperAlarm" + var tamperAlarm model.TamperAlarm + gconv.Struct(eventPull.EventContent, &tamperAlarm) + eventDataMap["state"] = tamperAlarm.TamperState + eventDataMap["value"] = tamperAlarm.TamperState + break + case 1005: + postEventData.EventCode = "ErrorCodeReport" + var temperatureAlarm model.ErrorCodeReport + gconv.Struct(eventPull.EventContent, &temperatureAlarm) + eventDataMap["state"] = temperatureAlarm.ErrorCode + eventDataMap["value"] = temperatureAlarm.ErrorCode + break + default: + return + } + postEventData.Data = append(postEventData.Data, eventDataMap) + pushData.Data = append(pushData.Data, postEventData) + err = PublishMessage(pushData) + if err != nil { + g.Log().Error(ctx, "publish message error", err) + return + } + writeStr := gconv.String(pushData) + res.Data = writeStr + return + } else if dataMap["messageType"] == "deviceOnlineOfflineReport" { + var reportPull model.ReportPullDto + err = json.Unmarshal(body, &reportPull) + writeStr := gconv.String(reportPull.ModuleParams) + err = PublishMessage(writeStr) + if err != nil { + g.Log().Error(ctx, "publish message error", err) + return + } + res.Data = writeStr + return + } else { + return + } + +} + +func PublishMessage(message interface{}) error { + mqttClient := GetMqttInstance().mqttClient + if mqttClient != nil && !mqttClient.GetClient().IsConnected() { + return gerror.New("mqtt client is not connected") + } + if mqttClient == nil { + return gerror.New("mqtt client is not connected") + } + if publishTopic == "" { + return gerror.New("publish topic is empty") + } + + return mqttClient.Publish(publishTopic, false, gconv.String(message)) + } diff --git a/internal/model/pullDto.go b/internal/model/pullDto.go new file mode 100644 index 0000000..6a7168f --- /dev/null +++ b/internal/model/pullDto.go @@ -0,0 +1,113 @@ +package model + +type PostDataDto struct { + UpPacketSN int `json:"upPacketSN"` + UpDataSN int `json:"upDataSN"` + Topic string `json:"topic"` + Timestamp int64 `json:"timestamp"` + TenantId string `json:"tenantId"` + ServiceId int `json:"serviceId"` + Protocol string `json:"protocol"` + ProductId string `json:"productId"` + Payload interface{} `json:"payload"` + MessageType string `json:"messageType"` + DeviceType string `json:"deviceType"` + DeviceId string `json:"deviceId"` + AssocAssetId string `json:"assocAssetId"` + IMSI string `json:"IMSI"` + IMEI string `json:"IMEI"` +} + +type Payload1 struct { + TemperatureState int `json:"temperature_state"` + Temperature int `json:"temperature"` + SmokeValue int `json:"smoke_value"` + SmokeState int `json:"smoke_state"` + BatteryVoltage float64 `json:"battery_voltage"` + BatteryValue int `json:"battery_value"` +} +type Payload2 struct { + Sinr int `json:"sinr"` + Rsrp int `json:"rsrp"` + Pci int `json:"pci"` + Ecl int `json:"ecl"` + CellId int `json:"cell_id"` +} +type Payload3 struct { + TerminalType string `json:"terminal_type"` + SoftwareVersion string `json:"software_version"` + ModuleType string `json:"module_type"` + ManufacturerName string `json:"manufacturer_name"` + HardwareVersion string `json:"hardware_version"` + IMEI string `json:"IMEI"` + ICCID string `json:"ICCID"` +} + +type EventPullDto struct { + Timestamp int64 `json:"timestamp"` + TenantId string `json:"tenantId"` + ServiceId int `json:"serviceId"` + Protocol string `json:"protocol"` + ProductId string `json:"productId"` + MessageType string `json:"messageType"` + EventType int `json:"eventType"` + EventContent interface{} `json:"eventContent"` + DeviceSn string `json:"deviceSn"` + DeviceId string `json:"deviceId"` + IMSI string `json:"IMSI"` + IMEI string `json:"IMEI"` +} + +// 烟感告警 1001 +type SmokeAlarm struct { + SmokeState int `json:"smoke_state"` + SmokeValue int `json:"smoke_value"` +} + +// 温度告警 1002 +type TemperatureAlarm struct { + TemperatureState int `json:"temperature_state"` + Temperature int `json:"temperature"` +} + +// 电池电量低告警 1003 +type BatteryVoltageLowAlarm struct { + BatteryVoltage float64 `json:"battery_voltage"` + BatteryState int `json:"battery_state"` +} + +// 防拆告警 1004 +type TamperAlarm struct { + TamperState int `json:"tamper_state"` +} + +// 故障1005 +type ErrorCodeReport struct { + ErrorCode int `json:"error_code"` +} + +type ReportPullDto struct { + Timestamp int64 `json:"timestamp"` + TenantId string `json:"tenantId"` + Protocol string `json:"protocol"` + ProductId string `json:"productId"` + ModuleParams struct { + Txpower string `json:"txpower"` + Softversion string `json:"softversion"` + Sinr string `json:"sinr"` + Rsrp string `json:"rsrp"` + Module string `json:"module"` + Imsi string `json:"imsi"` + Imei string `json:"imei"` + Iccid string `json:"iccid"` + Chiptype string `json:"chiptype"` + Cellid string `json:"cellid"` + } `json:"moduleParams"` + MessageType string `json:"messageType"` + Ipv4Address string `json:"ipv4Address"` + Iccid string `json:"iccid"` + EventType int `json:"eventType"` + DeviceId string `json:"deviceId"` + Imei string `json:"imei"` + AccessFlag int `json:"accessFlag"` +} diff --git a/internal/model/pushPowerDto.go b/internal/model/pushPowerDto.go new file mode 100644 index 0000000..f51cc1a --- /dev/null +++ b/internal/model/pushPowerDto.go @@ -0,0 +1,22 @@ +package model + +type PostParamDto struct { + MsgId string `json:"msgId"` + DeviceSN string `json:"deviceSN"` + Gateway string `json:"gateway"` + Timestamp int64 `json:"timestamp"` + Action string `json:"action"` + Data []interface{} `json:"data"` +} + +type PostParamData struct { + Addr int `json:"addr"` + Len int `json:"len"` + Data string `json:"data"` +} + +type PostEventData struct { + EventCode string `json:"eventcode"` + Timestamp int64 `json:"timestamp"` + Data []interface{} +} diff --git a/internal/service/mqtt_client.go b/internal/service/mqtt_client.go new file mode 100644 index 0000000..99ffafd --- /dev/null +++ b/internal/service/mqtt_client.go @@ -0,0 +1,114 @@ +package service + +import ( + "fmt" + "github.com/gogf/gf/v2/util/guid" + "strings" + + "log/slog" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +const C_QOS_0 = 0 +const C_QOS_1 = 1 +const C_QOS_2 = 2 + +type OnConnectedHandler func(*MqttClient) + +type MqttClient struct { + Broker string + ClientId string + UserName string + PassWord string + + client mqtt.Client + tokens []mqtt.Token + + OnConnected OnConnectedHandler +} + +func (obj *MqttClient) GetClient() mqtt.Client { + return obj.client +} + +/** +* 连接 + */ +func (obj *MqttClient) Connect() (err error) { + opts := mqtt.NewClientOptions() + opts.AddBroker(obj.Broker) + + if len(strings.TrimSpace(obj.ClientId)) == 0 { + obj.ClientId = "MqttClient" + guid.S() + } + opts.SetClientID(obj.ClientId) + opts.SetUsername(obj.UserName) + opts.SetPassword(obj.PassWord) + + opts.OnConnect = obj.connectHandler + opts.OnConnectionLost = obj.connectLostHandler + opts.SetDefaultPublishHandler(obj.messagePubHandler) + + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + obj.client = client + return nil +} + +func (obj *MqttClient) connectHandler(client mqtt.Client) { + //obj.client = client + slog.Info("Connected", "clientid", obj.ClientId, "Broker", obj.Broker) + if obj.OnConnected != nil { + obj.OnConnected(obj) + } +} + +func (obj *MqttClient) connectLostHandler(client mqtt.Client, err error) { + + slog.Error("Connect lost: "+err.Error(), "clientid", obj.ClientId, "Broker", obj.Broker) +} + +func (obj *MqttClient) messagePubHandler(client mqtt.Client, msg mqtt.Message) { + slog.Info(fmt.Sprintf("publish message: %s from topic: %s\n", msg.Payload(), msg.Topic())) + +} + +/** +* 订阅 +**/ +func (obj *MqttClient) Subscribe(topic string, callback mqtt.MessageHandler) error { + slog.Info(fmt.Sprintf("Subscribe topic: %s\n", topic)) + token := obj.client.Subscribe(topic, C_QOS_0, callback) + //obj.tokens = append(obj.tokens, token) + token.Wait() + return token.Error() +} + +func (obj *MqttClient) UnSubscribe(topic string) { + obj.client.Unsubscribe(topic) +} + +func (obj *MqttClient) Close() { + obj.client.Disconnect(uint(50)) + slog.Info(fmt.Sprintf("Connect close: %s on %s ", obj.ClientId, obj.Broker)) + +} + +func (obj *MqttClient) Publish(topic string, retained bool, payload string) (err error) { + + slog.Info(fmt.Sprintf(">>>>>>Publish %s, message: %s\n", topic, payload)) + //=================================== + //设置消息发送间隔 + //=================================== + token := obj.client.Publish(topic, C_QOS_0, retained, payload) + token.Wait() + err = token.Error() + if err != nil { + slog.Error(fmt.Sprintf("Publish %s error\n", topic), "error", err) + + } + return err +} diff --git a/manifest/config/config.yaml b/manifest/config/config.yaml index 4c94270..848f0db 100644 --- a/manifest/config/config.yaml +++ b/manifest/config/config.yaml @@ -9,3 +9,8 @@ logger: +mqtt: + ctwing: + broker: "tcp://120.77.172.42:8017" + username: "admin" + password: "gy2024"