From de233683ed571493a883c2d3553f0741631265cd Mon Sep 17 00:00:00 2001 From: Branden J Brown Date: Fri, 9 Aug 2024 17:36:29 -0500 Subject: [PATCH] robot, channel: disable learning in offline chats Fixes #35. --- channel/channel.go | 3 +- config.go | 2 - privmsg.go | 3 +- robot.go | 124 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 127 insertions(+), 5 deletions(-) diff --git a/channel/channel.go b/channel/channel.go index 156caed..9fde46a 100644 --- a/channel/channel.go +++ b/channel/channel.go @@ -3,6 +3,7 @@ package channel import ( "context" "regexp" + "sync/atomic" "gitlab.com/zephyrtronium/pick" "golang.org/x/time/rate" @@ -35,5 +36,5 @@ type Channel struct { // Effects is the distribution of effects. Effects *pick.Dist[string] // Enabled indicates whether a channel is allowed to learn messages. - Enabled bool + Enabled atomic.Bool } diff --git a/config.go b/config.go index 94fc240..1704c16 100644 --- a/config.go +++ b/config.go @@ -240,8 +240,6 @@ func (robo *Robot) SetTwitchChannels(ctx context.Context, global Global, channel Mod: mod, Memery: channel.NewMemeDetector(ch.Copypasta.Need, fseconds(ch.Copypasta.Within)), Emotes: emotes, - // TODO(zeph): check channel online status for enabled - Enabled: true, } v.Message = func(ctx context.Context, reply, text string) { msg := message.Format(reply, v.Name, "%s", text) diff --git a/privmsg.go b/privmsg.go index d61972c..866550c 100644 --- a/privmsg.go +++ b/privmsg.go @@ -155,7 +155,8 @@ func worker(ctx context.Context, works chan chan func(context.Context), ch chan // learn learns a given message's text if it passes ch's filters. func (robo *Robot) learn(ctx context.Context, ch *channel.Channel, hasher userhash.Hasher, msg *message.Received) { - if !ch.Enabled { + if !ch.Enabled.Load() { + slog.DebugContext(ctx, "not learning in disabled channel", slog.String("in", ch.Name)) return } switch err := robo.privacy.Check(ctx, msg.Sender); err { diff --git a/robot.go b/robot.go index 49613ee..6fd50e4 100644 --- a/robot.go +++ b/robot.go @@ -92,7 +92,6 @@ func (robo *Robot) twitch(ctx context.Context, group *errgroup.Group) error { if err != nil { return err } - // TODO(zeph): resolve usernames in configs to user ids cfg := tmi.ConnectConfig{ Dial: new(tls.Dialer).DialContext, RetryWait: tmi.RetryList(true, 0, time.Second, time.Minute, 5*time.Minute), @@ -105,6 +104,9 @@ func (robo *Robot) twitch(ctx context.Context, group *errgroup.Group) error { robo.tmiLoop(ctx, group, robo.tmi.send, robo.tmi.recv) return nil }) + group.Go(func() error { + return robo.streamsLoop(ctx, robo.channels) + }) tmi.Connect(ctx, cfg, &tmiSlog{slog.Default()}, robo.tmi.send, robo.tmi.recv) return ctx.Err() } @@ -201,6 +203,126 @@ func (robo *Robot) join(ctx context.Context, send chan<- *tmi.Message) { } } +func (robo *Robot) streamsLoop(ctx context.Context, channels map[string]*channel.Channel) error { + // TODO(zeph): one day we should switch to eventsub + // TODO(zeph): remove anything learned since the last check when offline + tok, err := robo.tmi.tokens.Token(ctx) + if err != nil { + return err + } + cl := twitch.Client{ + HTTP: &http.Client{Timeout: 30 * time.Second}, + Token: tok, + ID: robo.tmi.id, + } + streams := make([]twitch.Stream, 0, len(channels)) + m := make(map[string]bool, len(channels)) + // Run once at the start so we start learning in online streams immediately. + streams = streams[:0] + for _, ch := range channels { + n := strings.ToLower(strings.TrimPrefix(ch.Name, "#")) + streams = append(streams, twitch.Stream{UserLogin: n}) + } + for range 5 { + // TODO(zeph): limit to 100 + streams, err = twitch.UserStreams(ctx, cl, streams) + switch err { + case nil: + slog.InfoContext(ctx, "stream infos", slog.Int("count", len(streams))) + // Mark online streams as enabled. + // First map names to online status. + for _, s := range streams { + slog.DebugContext(ctx, "stream", + slog.String("login", s.UserLogin), + slog.String("display", s.UserName), + slog.String("id", s.UserID), + slog.String("type", s.Type), + ) + n := strings.ToLower(s.UserLogin) + m[n] = true + } + // Now loop all streams. + for _, ch := range channels { + n := strings.ToLower(strings.TrimPrefix(ch.Name, "#")) + ch.Enabled.Store(m[n]) + } + case twitch.ErrNeedRefresh: + tok, err := twitchToken(ctx, robo.tmi.tokens) + if err != nil { + slog.ErrorContext(ctx, "failed to refresh token", slog.Any("err", err)) + return fmt.Errorf("couldn't get valid access token: %w", err) + } + cl.Token = tok + continue + default: + slog.ErrorContext(ctx, "failed to query online broadcasters", slog.Any("streams", streams), slog.Any("err", err)) + // All streams are already offline. + } + break + } + streams = streams[:0] + clear(m) + + tick := time.NewTicker(time.Minute) + go func() { + <-ctx.Done() + tick.Stop() + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-tick.C: + for _, ch := range channels { + n := strings.TrimPrefix(ch.Name, "#") + streams = append(streams, twitch.Stream{UserLogin: n}) + } + for range 5 { + // TODO(zeph): limit to 100 + streams, err = twitch.UserStreams(ctx, cl, streams) + switch err { + case nil: + slog.InfoContext(ctx, "stream infos", slog.Int("count", len(streams))) + // Mark online streams as enabled. + // First map names to online status. + for _, s := range streams { + slog.DebugContext(ctx, "stream", + slog.String("login", s.UserLogin), + slog.String("display", s.UserName), + slog.String("id", s.UserID), + slog.String("type", s.Type), + ) + n := strings.ToLower(s.UserLogin) + m[n] = true + } + // Now loop all streams. + for _, ch := range channels { + n := strings.ToLower(strings.TrimPrefix(ch.Name, "#")) + ch.Enabled.Store(m[n]) + } + case twitch.ErrNeedRefresh: + tok, err := twitchToken(ctx, robo.tmi.tokens) + if err != nil { + slog.ErrorContext(ctx, "failed to refresh token", slog.Any("err", err)) + return fmt.Errorf("couldn't get valid access token: %w", err) + } + cl.Token = tok + continue + default: + slog.ErrorContext(ctx, "failed to query online broadcasters", slog.Any("streams", streams), slog.Any("err", err)) + // Set all streams as offline. + for _, ch := range channels { + ch.Enabled.Store(false) + } + } + break + } + streams = streams[:0] + clear(m) + } + } +} + func deviceCodePrompt(userCode, verURI, verURIComplete string) { fmt.Println("\n---- OAuth2 Device Code Flow ----") if verURIComplete != "" {