diff --git a/schema/schema.sql b/schema/schema.sql index cc59a56..dad2330 100644 --- a/schema/schema.sql +++ b/schema/schema.sql @@ -13,3 +13,8 @@ CREATE TABLE `users` ( `traq_uuid` VARCHAR(36) NOT NULL, `is_bot` BOOLEAN ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +CREATE TABLE IF NOT EXISTS `pollinginfo` ( + `key` int NOT NULL, + `lastpollingtime` datetime NOT NULL, + PRIMARY KEY (`key`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; diff --git a/server/model/db.go b/server/model/db.go index 7f24749..403257f 100644 --- a/server/model/db.go +++ b/server/model/db.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "time" _ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" @@ -99,3 +100,24 @@ func removeAlreadyExistUsers(allUsers UserList, alreadyUsersUUID []string) UserL } return newUserList } + +func RecordPollingTime(lastCheckPoint time.Time) error { + _, err := db.Exec("INSERT INTO `pollinginfo`(`key`,`lastpollingtime`) VALUES(1,?) ON DUPLICATE KEY UPDATE `lastpollingtime`=VALUES(lastpollingtime)", lastCheckPoint) + if err != nil { + slog.Info("Error recording pollinginfo: %v", err) + return err + } + + return nil +} + +func GetPollingFrom() (time.Time, error) { + var from time.Time + err := db.Get(&from, "SELECT `lastpollingtime` FROM `pollinginfo` WHERE `key`=1") + if err != nil { + slog.Info("Error recording pollinginfo: %v", err) + return from, err + } + + return from, nil +} diff --git a/server/traqmessage/collect.go b/server/traqmessage/collect.go index ade3199..dfdef7a 100644 --- a/server/traqmessage/collect.go +++ b/server/traqmessage/collect.go @@ -3,10 +3,10 @@ package traqmessage import ( "context" "fmt" - "traQ-gazer/model" "strings" "sync" "time" + "traQ-gazer/model" "github.com/traPtitech/go-traq" "golang.org/x/exp/slog" @@ -30,7 +30,12 @@ func (m *MessagePoller) Run() { pollingInterval := time.Minute * 3 - lastCheckpoint := time.Now() + lastCheckpoint, err := model.GetPollingFrom() + if err != nil { + slog.Info("Error getting polling from") + lastCheckpoint = time.Now() + } + var checkpointMutex sync.Mutex ticker := time.Tick(pollingInterval) @@ -40,21 +45,34 @@ func (m *MessagePoller) Run() { now := time.Now() var collectedMessageCount int + var tmplastCheckpoint time.Time + for page := 0; ; page++ { messages, more, err := collectMessages(lastCheckpoint, now, page) + if err != nil { slog.Error(fmt.Sprintf("Failed to polling messages: %v", err)) + lastCheckpoint = now + err := model.RecordPollingTime(lastCheckpoint) + if err != nil { + slog.Error(fmt.Sprintf("Failed to recording polling time: %v", err)) + } break } tmpMessageCount := len(*messages) + // ページ0の時なら検索対象最新メッセージが真に最新メッセージ + if page == 0 { + tmplastCheckpoint = (*messages)[0].CreatedAt + } + slog.Info(fmt.Sprintf("Collected %d messages", tmpMessageCount)) + collectedMessageCount += tmpMessageCount // 取得したメッセージを使っての処理の呼び出し m.processor.enqueue(messages) - if !more { break } @@ -62,7 +80,11 @@ func (m *MessagePoller) Run() { slog.Info(fmt.Sprintf("%d messages collected totally", collectedMessageCount)) - lastCheckpoint = now + lastCheckpoint = tmplastCheckpoint + err := model.RecordPollingTime(lastCheckpoint) + if err != nil { + slog.Error(fmt.Sprintf("Failed to polling messages: %v", err)) + } checkpointMutex.Unlock() } } @@ -151,13 +173,15 @@ func collectMessages(from time.Time, to time.Time, page int) (*[]traq.Message, b // 1度での取得上限は100まで それ以上はoffsetを使うこと // https://github.com/traPtitech/traQ/blob/47ed2cf94b2209c8444533326dee2a588936d5e0/service/search/engine.go#L51 const limit = 100 - result, _, err := client.MessageApi.SearchMessages(auth).After(from).Before(to).Limit(limit).Offset(int32(limit * page)).Execute() + result, _, err := client.MessageApi.SearchMessages(auth).After(from).Before(to).Limit(limit).Offset(int32(limit * page)).Sort(`createdAt`).Execute() + if err != nil { return nil, false, err } messages := result.Hits - more := limit * (page + 1) < int(result.TotalHits) + + more := limit*(page+1) < int(result.TotalHits) return &messages, more, nil }