54 lines
1.3 KiB
Go
54 lines
1.3 KiB
Go
package kiosk
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
func brokerConsume(ctx context.Context) {
|
|
logger := config.logger
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker(config.broker)
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
messages := make(chan [2]string)
|
|
opts.SetClientID(fmt.Sprintf("hasskiosk-%s-%d", hostname, os.Getpid()))
|
|
opts.SetUsername(config.username)
|
|
opts.SetPassword(config.password)
|
|
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
|
|
messages <- [2]string{msg.Topic(), string(msg.Payload())}
|
|
})
|
|
done := make(chan bool)
|
|
|
|
logger.Debug("creating client")
|
|
client := MQTT.NewClient(opts)
|
|
logger.Debug("starting client")
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
logger.Error("MQTT client startup problem", "err", token.Error())
|
|
return
|
|
}
|
|
defer client.Disconnect(250)
|
|
if token := client.Subscribe(config.presenceTopic, 1, nil); token.Wait() && token.Error() != nil {
|
|
logger.Error("MQTT subscribe error", "err", token.Error())
|
|
return
|
|
}
|
|
go func() {
|
|
for true {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info("Context shutdown. Exiting MQTT loop")
|
|
done <- true
|
|
return
|
|
case msg := <-messages:
|
|
go handlePresenceMessage(msg)
|
|
}
|
|
}
|
|
}()
|
|
<-done
|
|
}
|