package kiosk import ( "context" "fmt" "os" MQTT "github.com/eclipse/paho.mqtt.golang" ) func brokerConsume(ctx context.Context) { logger := config.logger opts := MQTT.NewClientOptions() opts.AddBroker(config.broker) hostname, err := os.Hostname() if err != nil { panic(err) } messages := make(chan [2]string) opts.SetClientID(fmt.Sprintf("hasskiosk-%s-%d", hostname, os.Getpid())) opts.SetUsername(config.username) opts.SetPassword(config.password) opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) { messages <- [2]string{msg.Topic(), string(msg.Payload())} }) done := make(chan bool) logger.Debug("creating client") client := MQTT.NewClient(opts) logger.Debug("starting client") if token := client.Connect(); token.Wait() && token.Error() != nil { logger.Error("MQTT client startup problem", "err", token.Error()) return } defer client.Disconnect(250) if token := client.Subscribe(config.presenceTopic, 1, nil); token.Wait() && token.Error() != nil { logger.Error("MQTT subscribe error", "err", token.Error()) return } go func() { for true { select { case <-ctx.Done(): logger.Info("Context shutdown. Exiting MQTT loop") done <- true return case msg := <-messages: logger.Info("MQTT message received", "topic", msg[0], "body", msg[1]) } } }() <-done }