From 913193463abaa7086dc6ba23b0c45f998d76b33f Mon Sep 17 00:00:00 2001 From: huangzhiran Date: Sun, 18 Feb 2024 16:08:45 +0800 Subject: [PATCH] feat(enode): change task state and query unpacked message will return received --- cmd/enode/api/http.go | 17 +++++++++++++++-- persistence/postgres.go | 31 ++++++++++++++++++++++++++----- task/processor.go | 2 +- types/message.go | 7 +++++++ types/task.go | 12 ++++++------ 5 files changed, 55 insertions(+), 14 deletions(-) diff --git a/cmd/enode/api/http.go b/cmd/enode/api/http.go index 2e0661fd..2299270e 100644 --- a/cmd/enode/api/http.go +++ b/cmd/enode/api/http.go @@ -139,13 +139,27 @@ func (s *HttpServer) queryStateLogByID(c *gin.Context) { messageID := c.Param("id") + ms, err := s.pg.FetchMessage(messageID) + if err != nil { + c.JSON(http.StatusInternalServerError, newErrResp(err)) + return + } + if len(ms) == 0 { + c.JSON(http.StatusOK, &queryMessageStateLogResp{MessageID: messageID}) + return + } + ls, err := s.pg.FetchStateLog(messageID) if err != nil { c.JSON(http.StatusInternalServerError, newErrResp(err)) return } - ss := []*stateLog{} + ss := []*stateLog{ + { + State: "received", + Time: ms[0].CreatedAt, + }} for _, l := range ls { ss = append(ss, &stateLog{ State: l.State.String(), @@ -154,7 +168,6 @@ func (s *HttpServer) queryStateLogByID(c *gin.Context) { }) } - slog.Debug("received message querying", "message_id", messageID) c.JSON(http.StatusOK, &queryMessageStateLogResp{MessageID: messageID, States: ss}) } diff --git a/persistence/postgres.go b/persistence/postgres.go index f8aa3dc9..be7e0663 100644 --- a/persistence/postgres.go +++ b/persistence/postgres.go @@ -54,12 +54,12 @@ func (p *Postgres) Save(msg *types.Message, config *project.Config) error { TaskID: tid, MessageID: msg.ID, ProjectID: msg.ProjectID, - State: types.TaskStateReceived, + State: types.TaskStatePacked, }} l := taskStateLog{ TaskID: tid, - State: types.TaskStateReceived, + State: types.TaskStatePacked, } return p.db.Transaction(func(tx *gorm.DB) error { @@ -81,7 +81,7 @@ func (p *Postgres) Save(msg *types.Message, config *project.Config) error { TaskID: tid, MessageID: m.MessageID, ProjectID: m.ProjectID, - State: types.TaskStateReceived, + State: types.TaskStatePacked, }) } if err := tx.Model(message{}).Where("message_id IN ?", mids).Update("task_id", tid).Error; err != nil { @@ -105,7 +105,7 @@ func (p *Postgres) Save(msg *types.Message, config *project.Config) error { func (p *Postgres) Fetch() (*types.Task, error) { t := task{} - if err := p.db.Where("state = ?", types.TaskStateReceived).First(&t).Error; err != nil { + if err := p.db.Where("state = ?", types.TaskStatePacked).First(&t).Error; err != nil { if err == gorm.ErrRecordNotFound { return nil, nil } @@ -149,10 +149,31 @@ func (p *Postgres) FetchByID(taskID string) (*types.Task, error) { }, nil } +func (p *Postgres) FetchMessage(messageID string) ([]*types.MessageWithTime, error) { + ms := []*message{} + if err := p.db.Where("message_id = ?", messageID).Find(&ms).Error; err != nil { + return nil, errors.Wrapf(err, "query message by messageID failed, messageID %s", messageID) + } + + tms := []*types.MessageWithTime{} + for _, m := range ms { + tms = append(tms, &types.MessageWithTime{ + Message: types.Message{ + ID: m.MessageID, + ProjectID: m.ProjectID, + ProjectVersion: m.ProjectVersion, + Data: m.Data, + }, + CreatedAt: m.CreatedAt, + }) + } + return tms, nil +} + func (p *Postgres) FetchStateLog(messageID string) ([]*types.TaskStateLog, error) { ts := []*task{} if err := p.db.Where("message_id = ?", messageID).Find(&ts).Error; err != nil { - return nil, errors.Wrapf(err, "query task by message id failed, messageID %s", messageID) + return nil, errors.Wrapf(err, "query task by messageID failed, messageID %s", messageID) } tids := []string{} for _, t := range ts { diff --git a/task/processor.go b/task/processor.go index 1955c42e..a10d6062 100644 --- a/task/processor.go +++ b/task/processor.go @@ -28,7 +28,7 @@ func (r *Processor) handleP2PData(d *p2p.Data, topic *pubsub.Topic) { tid := d.Task.ID ms := d.Task.Messages slog.Debug("get new task", "task_id", tid) - r.reportSuccess(tid, types.TaskStateFetched, "", topic) + r.reportSuccess(tid, types.TaskStateDispatched, "", topic) config, err := r.projectManager.Get(ms[0].ProjectID, ms[0].ProjectVersion) if err != nil { diff --git a/types/message.go b/types/message.go index 41604bd8..8e6a3a93 100644 --- a/types/message.go +++ b/types/message.go @@ -1,8 +1,15 @@ package types +import "time" + type Message struct { ID string `json:"id"` ProjectID uint64 `json:"projectID"` ProjectVersion string `json:"projectVersion"` Data string `json:"data"` } + +type MessageWithTime struct { + Message + CreatedAt time.Time +} diff --git a/types/task.go b/types/task.go index 8f31487b..2e712877 100644 --- a/types/task.go +++ b/types/task.go @@ -11,8 +11,8 @@ type TaskState uint8 const ( TaskStateInvalid TaskState = iota - TaskStateReceived - TaskStateFetched + TaskStatePacked + TaskStateDispatched _ TaskStateProved _ @@ -29,10 +29,10 @@ type TaskStateLog struct { func (s TaskState) String() string { switch s { - case TaskStateReceived: - return "received" - case TaskStateFetched: - return "fetched" + case TaskStatePacked: + return "packed" + case TaskStateDispatched: + return "dispatched" case TaskStateProved: return "proved" case TaskStateOutputted: