Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ポーリング対象範囲の改善と伴う変数の追加 #133

Merged
merged 13 commits into from
Jan 29, 2024
5 changes: 5 additions & 0 deletions schema/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ CREATE TABLE `users` (
`traq_uuid` VARCHAR(36) NOT NULL,
`is_bot` BOOLEAN
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE IF NOT EXISTS `pollinginfo` (
`key` int NOT NULL,
`lastpollingtime` datetime NOT NULL,
PRIMARY KEY (`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
22 changes: 22 additions & 0 deletions server/model/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -99,3 +100,24 @@ func removeAlreadyExistUsers(allUsers UserList, alreadyUsersUUID []string) UserL
}
return newUserList
}

func RecordPollingTime(lastCheckPoint time.Time) error {
_, err := db.Exec("INSERT INTO `pollinginfo`(`key`,`lastpollingtime`) VALUES(1,?) ON DUPLICATE KEY UPDATE `lastpollingtime`=VALUES(lastpollingtime)", lastCheckPoint)
if err != nil {
slog.Info("Error recording pollinginfo: %v", err)
return err
}

return nil
}

func GetPollingFrom() (time.Time, error) {
var from time.Time
err := db.Get(&from, "SELECT `lastpollingtime` FROM `pollinginfo` WHERE `key`=1")
if err != nil {
slog.Info("Error recording pollinginfo: %v", err)
return from, err
}

return from, nil
}
36 changes: 30 additions & 6 deletions server/traqmessage/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package traqmessage
import (
"context"
"fmt"
"traQ-gazer/model"
"strings"
"sync"
"time"
"traQ-gazer/model"

"github.com/traPtitech/go-traq"
"golang.org/x/exp/slog"
Expand All @@ -30,7 +30,12 @@ func (m *MessagePoller) Run() {

pollingInterval := time.Minute * 3

lastCheckpoint := time.Now()
Pugma marked this conversation as resolved.
Show resolved Hide resolved
lastCheckpoint, err := model.GetPollingFrom()
if err != nil {
slog.Info("Error getting polling from")
lastCheckpoint = time.Now()
}

var checkpointMutex sync.Mutex

ticker := time.Tick(pollingInterval)
Expand All @@ -40,29 +45,46 @@ func (m *MessagePoller) Run() {

now := time.Now()
var collectedMessageCount int
var tmplastCheckpoint time.Time

for page := 0; ; page++ {
messages, more, err := collectMessages(lastCheckpoint, now, page)

if err != nil {
slog.Error(fmt.Sprintf("Failed to polling messages: %v", err))
lastCheckpoint = now
err := model.RecordPollingTime(lastCheckpoint)
if err != nil {
slog.Error(fmt.Sprintf("Failed to recording polling time: %v", err))
}
break
}

tmpMessageCount := len(*messages)

// ページ0の時なら検索対象最新メッセージが真に最新メッセージ
if page == 0 {
tmplastCheckpoint = (*messages)[0].CreatedAt
}
Pugma marked this conversation as resolved.
Show resolved Hide resolved

slog.Info(fmt.Sprintf("Collected %d messages", tmpMessageCount))

collectedMessageCount += tmpMessageCount

// 取得したメッセージを使っての処理の呼び出し
m.processor.enqueue(messages)

if !more {
break
}
}

slog.Info(fmt.Sprintf("%d messages collected totally", collectedMessageCount))

lastCheckpoint = now
lastCheckpoint = tmplastCheckpoint
err := model.RecordPollingTime(lastCheckpoint)
if err != nil {
slog.Error(fmt.Sprintf("Failed to polling messages: %v", err))
}
checkpointMutex.Unlock()
}
}
Expand Down Expand Up @@ -151,13 +173,15 @@ func collectMessages(from time.Time, to time.Time, page int) (*[]traq.Message, b
// 1度での取得上限は100まで それ以上はoffsetを使うこと
// https://github.com/traPtitech/traQ/blob/47ed2cf94b2209c8444533326dee2a588936d5e0/service/search/engine.go#L51
const limit = 100
result, _, err := client.MessageApi.SearchMessages(auth).After(from).Before(to).Limit(limit).Offset(int32(limit * page)).Execute()
result, _, err := client.MessageApi.SearchMessages(auth).After(from).Before(to).Limit(limit).Offset(int32(limit * page)).Sort(`createdAt`).Execute()

if err != nil {
return nil, false, err
}

messages := result.Hits
more := limit * (page + 1) < int(result.TotalHits)

more := limit*(page+1) < int(result.TotalHits)
return &messages, more, nil
}

Expand Down
Loading