mqtt reading from broker

This commit is contained in:
Mike Bloy 2024-09-15 12:25:06 -05:00
parent d22da95cce
commit f085312e13
6 changed files with 63 additions and 20 deletions

View File

@ -9,7 +9,10 @@ import (
"os/signal"
"syscall"
"git.bloy.org/mike/hasshelper/kiosk"
"git.bloy.org/mike/hasshelper/web"
_ "github.com/joho/godotenv/autoload"
"github.com/lmittmann/tint"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@ -25,16 +28,19 @@ var rootCmd = &cobra.Command{
func rootCmdRun(cmd *cobra.Command, args []string) {
var logLevel slog.Level
logLevel.UnmarshalText([]byte(viper.GetString("loglevel")))
logger := slog.New(slog.NewTextHandler(os.Stdout,
&slog.HandlerOptions{Level: logLevel}))
logger := slog.New(
tint.NewHandler(os.Stdout, &tint.Options{
Level: logLevel,
TimeFormat: "2006-01-02T15:04:05.999",
NoColor: viper.GetString("deployment") == "prod",
}))
logger.Info("HASSHelper startup", "version", viper.GetString("version"))
exitchan := make(chan bool)
signalchan := make(chan os.Signal, 1)
done := make(chan bool)
done := make(chan bool, 2)
signal.Notify(signalchan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
web.Run(logger, exitchan, ctx)
go func() {
select {
case <-signalchan:
@ -47,6 +53,9 @@ func rootCmdRun(cmd *cobra.Command, args []string) {
done <- true
}
}()
go web.Run(logger, exitchan, ctx)
go kiosk.Run(logger, exitchan, ctx)
logger.Debug("Waiting for exit")
<-done
}
@ -59,26 +68,26 @@ func Execute() {
}
func init() {
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "",
"config file (default is $HOME/.config/hasshelper/config.toml)")
cobra.OnInitialize(initConfig)
const dirName = "hasshelper"
var userConfigDir, err = os.UserConfigDir()
const dirName = "hasshelper"
var defDir = fmt.Sprintf("%s%c%s", "/etc", os.PathSeparator, dirName)
if err == nil {
viper.AddConfigPath(
fmt.Sprintf("%s%c%s", userConfigDir, os.PathSeparator, dirName))
defDir = fmt.Sprintf("%s%c%s", userConfigDir, os.PathSeparator, dirName)
viper.AddConfigPath(defDir)
} else {
log.Println("could not locate user config dir:", err)
}
rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "",
fmt.Sprintf("config file (default is %s%c%s)", defDir, os.PathSeparator, "hasshelper.toml"))
cobra.OnInitialize(initConfig)
viper.AddConfigPath(fmt.Sprintf("/etc%c%s", os.PathSeparator, dirName))
viper.SetConfigName("config.toml")
viper.SetEnvPrefix("hasshelper")
}
func initConfig() {
viper.Set("version", gitVersion)
viper.SetDefault("deployment", "prod")
viper.SetDefault("loglevel", "info")
viper.SetEnvPrefix("HASS")
viper.AutomaticEnv()
if cfgFile != "" {
viper.SetConfigFile(cfgFile)

2
go.mod
View File

@ -8,6 +8,8 @@ require (
github.com/cortesi/devd v0.0.0-20200427000907-c1a3bfba27d8
github.com/cortesi/modd v0.8.1
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/joho/godotenv v1.5.1
github.com/lmittmann/tint v1.0.5
github.com/mdomke/git-semver/v6 v6.9.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0

4
go.sum
View File

@ -106,6 +106,8 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A=
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/juju/ratelimit v1.0.1/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
@ -120,6 +122,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw=
github.com/lmittmann/tint v1.0.5/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=

View File

@ -1,6 +1,7 @@
package kiosk
import (
"context"
"log/slog"
"github.com/spf13/viper"
@ -18,12 +19,13 @@ type configObj struct {
var config = configObj{}
func Run(rootLogger *slog.Logger, exitch chan bool) {
func Run(rootLogger *slog.Logger, exitch chan bool, ctx context.Context) {
config.logger = rootLogger.With("component", "kiosk")
config.broker = viper.GetString("mqtt_broker_url")
config.username = viper.GetString("mqtt_broker_user")
config.password = viper.GetString("mqtt_broker_password")
config.presenceTopic = viper.GetString("mqtt_presence_topic")
brokerConsume()
config.logger.Info("starting MQTT broker client")
brokerConsume(ctx)
exitch <- true
}

View File

@ -1,27 +1,53 @@
package kiosk
import (
"context"
"fmt"
"os"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
func brokerConsume() {
func brokerConsume(ctx context.Context) {
logger := config.logger
opts := MQTT.NewClientOptions()
opts.AddBroker(config.broker)
hostname, err := os.Hostname()
if err != nil {
panic(err)
}
messages := make(chan [2]string)
opts.SetClientID(fmt.Sprintf("hasskiosk-%s-%d", hostname, os.Getpid()))
opts.SetUsername(config.username)
opts.SetPassword(config.password)
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
messages <- [2]string{msg.Topic(), string(msg.Payload())}
})
}
func defaultMQTTHandler(client MQTT.Client, msg MQTT.Message) {
done := make(chan bool)
logger.Debug("creating client")
client := MQTT.NewClient(opts)
logger.Debug("starting client")
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Error("MQTT client startup problem", "err", token.Error())
return
}
defer client.Disconnect(250)
if token := client.Subscribe(config.presenceTopic, 1, nil); token.Wait() && token.Error() != nil {
logger.Error("MQTT subscribe error", "err", token.Error())
return
}
go func() {
for true {
select {
case <-ctx.Done():
logger.Info("Context shutdown. Exiting MQTT loop")
done <- true
return
case msg := <-messages:
logger.Info("MQTT message received", "topic", msg[0], "body", msg[1])
}
}
}()
<-done
}

View File

@ -9,5 +9,5 @@
**/*.go .env {
prep: go mod tidy
prep: go test @dirmods
daemon +sigterm: go run ./main.go -c ../hasshelper.toml
daemon +sigterm: go run ./main.go
}