Skip to content

Commit

Permalink
robot, channel: disable learning in offline chats
Browse files Browse the repository at this point in the history
Fixes #35.
  • Loading branch information
zephyrtronium committed Aug 9, 2024
1 parent 73ac6b8 commit de23368
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 5 deletions.
3 changes: 2 additions & 1 deletion channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package channel
import (
"context"
"regexp"
"sync/atomic"

"gitlab.com/zephyrtronium/pick"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -35,5 +36,5 @@ type Channel struct {
// Effects is the distribution of effects.
Effects *pick.Dist[string]
// Enabled indicates whether a channel is allowed to learn messages.
Enabled bool
Enabled atomic.Bool
}
2 changes: 0 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ func (robo *Robot) SetTwitchChannels(ctx context.Context, global Global, channel
Mod: mod,
Memery: channel.NewMemeDetector(ch.Copypasta.Need, fseconds(ch.Copypasta.Within)),
Emotes: emotes,
// TODO(zeph): check channel online status for enabled
Enabled: true,
}
v.Message = func(ctx context.Context, reply, text string) {
msg := message.Format(reply, v.Name, "%s", text)
Expand Down
3 changes: 2 additions & 1 deletion privmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func worker(ctx context.Context, works chan chan func(context.Context), ch chan

// learn learns a given message's text if it passes ch's filters.
func (robo *Robot) learn(ctx context.Context, ch *channel.Channel, hasher userhash.Hasher, msg *message.Received) {
if !ch.Enabled {
if !ch.Enabled.Load() {
slog.DebugContext(ctx, "not learning in disabled channel", slog.String("in", ch.Name))
return
}
switch err := robo.privacy.Check(ctx, msg.Sender); err {
Expand Down
124 changes: 123 additions & 1 deletion robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (robo *Robot) twitch(ctx context.Context, group *errgroup.Group) error {
if err != nil {
return err
}
// TODO(zeph): resolve usernames in configs to user ids
cfg := tmi.ConnectConfig{
Dial: new(tls.Dialer).DialContext,
RetryWait: tmi.RetryList(true, 0, time.Second, time.Minute, 5*time.Minute),
Expand All @@ -105,6 +104,9 @@ func (robo *Robot) twitch(ctx context.Context, group *errgroup.Group) error {
robo.tmiLoop(ctx, group, robo.tmi.send, robo.tmi.recv)
return nil
})
group.Go(func() error {
return robo.streamsLoop(ctx, robo.channels)
})
tmi.Connect(ctx, cfg, &tmiSlog{slog.Default()}, robo.tmi.send, robo.tmi.recv)
return ctx.Err()
}
Expand Down Expand Up @@ -201,6 +203,126 @@ func (robo *Robot) join(ctx context.Context, send chan<- *tmi.Message) {
}
}

func (robo *Robot) streamsLoop(ctx context.Context, channels map[string]*channel.Channel) error {
// TODO(zeph): one day we should switch to eventsub
// TODO(zeph): remove anything learned since the last check when offline
tok, err := robo.tmi.tokens.Token(ctx)
if err != nil {
return err
}
cl := twitch.Client{
HTTP: &http.Client{Timeout: 30 * time.Second},
Token: tok,
ID: robo.tmi.id,
}
streams := make([]twitch.Stream, 0, len(channels))
m := make(map[string]bool, len(channels))
// Run once at the start so we start learning in online streams immediately.
streams = streams[:0]
for _, ch := range channels {
n := strings.ToLower(strings.TrimPrefix(ch.Name, "#"))
streams = append(streams, twitch.Stream{UserLogin: n})
}
for range 5 {
// TODO(zeph): limit to 100
streams, err = twitch.UserStreams(ctx, cl, streams)
switch err {
case nil:
slog.InfoContext(ctx, "stream infos", slog.Int("count", len(streams)))
// Mark online streams as enabled.
// First map names to online status.
for _, s := range streams {
slog.DebugContext(ctx, "stream",
slog.String("login", s.UserLogin),
slog.String("display", s.UserName),
slog.String("id", s.UserID),
slog.String("type", s.Type),
)
n := strings.ToLower(s.UserLogin)
m[n] = true
}
// Now loop all streams.
for _, ch := range channels {
n := strings.ToLower(strings.TrimPrefix(ch.Name, "#"))
ch.Enabled.Store(m[n])
}
case twitch.ErrNeedRefresh:
tok, err := twitchToken(ctx, robo.tmi.tokens)
if err != nil {
slog.ErrorContext(ctx, "failed to refresh token", slog.Any("err", err))
return fmt.Errorf("couldn't get valid access token: %w", err)
}
cl.Token = tok
continue
default:
slog.ErrorContext(ctx, "failed to query online broadcasters", slog.Any("streams", streams), slog.Any("err", err))
// All streams are already offline.
}
break
}
streams = streams[:0]
clear(m)

tick := time.NewTicker(time.Minute)
go func() {
<-ctx.Done()
tick.Stop()
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
for _, ch := range channels {
n := strings.TrimPrefix(ch.Name, "#")
streams = append(streams, twitch.Stream{UserLogin: n})
}
for range 5 {
// TODO(zeph): limit to 100
streams, err = twitch.UserStreams(ctx, cl, streams)
switch err {
case nil:
slog.InfoContext(ctx, "stream infos", slog.Int("count", len(streams)))
// Mark online streams as enabled.
// First map names to online status.
for _, s := range streams {
slog.DebugContext(ctx, "stream",
slog.String("login", s.UserLogin),
slog.String("display", s.UserName),
slog.String("id", s.UserID),
slog.String("type", s.Type),
)
n := strings.ToLower(s.UserLogin)
m[n] = true
}
// Now loop all streams.
for _, ch := range channels {
n := strings.ToLower(strings.TrimPrefix(ch.Name, "#"))
ch.Enabled.Store(m[n])
}
case twitch.ErrNeedRefresh:
tok, err := twitchToken(ctx, robo.tmi.tokens)
if err != nil {
slog.ErrorContext(ctx, "failed to refresh token", slog.Any("err", err))
return fmt.Errorf("couldn't get valid access token: %w", err)
}
cl.Token = tok
continue
default:
slog.ErrorContext(ctx, "failed to query online broadcasters", slog.Any("streams", streams), slog.Any("err", err))
// Set all streams as offline.
for _, ch := range channels {
ch.Enabled.Store(false)
}
}
break
}
streams = streams[:0]
clear(m)
}
}
}

func deviceCodePrompt(userCode, verURI, verURIComplete string) {
fmt.Println("\n---- OAuth2 Device Code Flow ----")
if verURIComplete != "" {
Expand Down

0 comments on commit de23368

Please sign in to comment.