54 lines
1.3 KiB
Go

package kiosk
import (
"context"
"fmt"
"os"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
func brokerConsume(ctx context.Context) {
logger := config.logger
opts := MQTT.NewClientOptions()
opts.AddBroker(config.broker)
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
messages := make(chan [2]string)
opts.SetClientID(fmt.Sprintf("hasskiosk-%s-%d", hostname, os.Getpid()))
opts.SetUsername(config.username)
opts.SetPassword(config.password)
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
messages <- [2]string{msg.Topic(), string(msg.Payload())}
})
done := make(chan bool)
logger.Debug("creating client")
client := MQTT.NewClient(opts)
logger.Debug("starting client")
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Error("MQTT client startup problem", "err", token.Error())
return
}
defer client.Disconnect(250)
if token := client.Subscribe(config.presenceTopic, 1, nil); token.Wait() && token.Error() != nil {
logger.Error("MQTT subscribe error", "err", token.Error())
return
}
go func() {
for true {
select {
case <-ctx.Done():
logger.Info("Context shutdown. Exiting MQTT loop")
done <- true
return
case msg := <-messages:
go handlePresenceMessage(msg)
}
}
}()
<-done
}