Skip to content

Commit

Permalink
feat(enode): change task state and query unpacked message will return…
Browse files Browse the repository at this point in the history
… received
  • Loading branch information
huangzhiran committed Feb 18, 2024
1 parent 30c33c7 commit 9131934
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 14 deletions.
17 changes: 15 additions & 2 deletions cmd/enode/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,27 @@ func (s *HttpServer) queryStateLogByID(c *gin.Context) {

messageID := c.Param("id")

ms, err := s.pg.FetchMessage(messageID)
if err != nil {
c.JSON(http.StatusInternalServerError, newErrResp(err))
return
}
if len(ms) == 0 {
c.JSON(http.StatusOK, &queryMessageStateLogResp{MessageID: messageID})
return
}

ls, err := s.pg.FetchStateLog(messageID)
if err != nil {
c.JSON(http.StatusInternalServerError, newErrResp(err))
return
}

ss := []*stateLog{}
ss := []*stateLog{
{
State: "received",
Time: ms[0].CreatedAt,
}}
for _, l := range ls {
ss = append(ss, &stateLog{
State: l.State.String(),
Expand All @@ -154,7 +168,6 @@ func (s *HttpServer) queryStateLogByID(c *gin.Context) {
})
}

slog.Debug("received message querying", "message_id", messageID)
c.JSON(http.StatusOK, &queryMessageStateLogResp{MessageID: messageID, States: ss})
}

Expand Down
31 changes: 26 additions & 5 deletions persistence/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ func (p *Postgres) Save(msg *types.Message, config *project.Config) error {
TaskID: tid,
MessageID: msg.ID,
ProjectID: msg.ProjectID,
State: types.TaskStateReceived,
State: types.TaskStatePacked,
}}

l := taskStateLog{
TaskID: tid,
State: types.TaskStateReceived,
State: types.TaskStatePacked,
}

return p.db.Transaction(func(tx *gorm.DB) error {
Expand All @@ -81,7 +81,7 @@ func (p *Postgres) Save(msg *types.Message, config *project.Config) error {
TaskID: tid,
MessageID: m.MessageID,
ProjectID: m.ProjectID,
State: types.TaskStateReceived,
State: types.TaskStatePacked,
})
}
if err := tx.Model(message{}).Where("message_id IN ?", mids).Update("task_id", tid).Error; err != nil {
Expand All @@ -105,7 +105,7 @@ func (p *Postgres) Save(msg *types.Message, config *project.Config) error {

func (p *Postgres) Fetch() (*types.Task, error) {
t := task{}
if err := p.db.Where("state = ?", types.TaskStateReceived).First(&t).Error; err != nil {
if err := p.db.Where("state = ?", types.TaskStatePacked).First(&t).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil
}
Expand Down Expand Up @@ -149,10 +149,31 @@ func (p *Postgres) FetchByID(taskID string) (*types.Task, error) {
}, nil
}

func (p *Postgres) FetchMessage(messageID string) ([]*types.MessageWithTime, error) {
ms := []*message{}
if err := p.db.Where("message_id = ?", messageID).Find(&ms).Error; err != nil {
return nil, errors.Wrapf(err, "query message by messageID failed, messageID %s", messageID)
}

tms := []*types.MessageWithTime{}
for _, m := range ms {
tms = append(tms, &types.MessageWithTime{
Message: types.Message{
ID: m.MessageID,
ProjectID: m.ProjectID,
ProjectVersion: m.ProjectVersion,
Data: m.Data,
},
CreatedAt: m.CreatedAt,
})
}
return tms, nil
}

func (p *Postgres) FetchStateLog(messageID string) ([]*types.TaskStateLog, error) {
ts := []*task{}
if err := p.db.Where("message_id = ?", messageID).Find(&ts).Error; err != nil {
return nil, errors.Wrapf(err, "query task by message id failed, messageID %s", messageID)
return nil, errors.Wrapf(err, "query task by messageID failed, messageID %s", messageID)
}
tids := []string{}
for _, t := range ts {
Expand Down
2 changes: 1 addition & 1 deletion task/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (r *Processor) handleP2PData(d *p2p.Data, topic *pubsub.Topic) {
tid := d.Task.ID
ms := d.Task.Messages
slog.Debug("get new task", "task_id", tid)
r.reportSuccess(tid, types.TaskStateFetched, "", topic)
r.reportSuccess(tid, types.TaskStateDispatched, "", topic)

config, err := r.projectManager.Get(ms[0].ProjectID, ms[0].ProjectVersion)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions types/message.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package types

import "time"

type Message struct {
ID string `json:"id"`
ProjectID uint64 `json:"projectID"`
ProjectVersion string `json:"projectVersion"`
Data string `json:"data"`
}

type MessageWithTime struct {
Message
CreatedAt time.Time
}
12 changes: 6 additions & 6 deletions types/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type TaskState uint8

const (
TaskStateInvalid TaskState = iota
TaskStateReceived
TaskStateFetched
TaskStatePacked
TaskStateDispatched
_
TaskStateProved
_
Expand All @@ -29,10 +29,10 @@ type TaskStateLog struct {

func (s TaskState) String() string {
switch s {
case TaskStateReceived:
return "received"
case TaskStateFetched:
return "fetched"
case TaskStatePacked:
return "packed"
case TaskStateDispatched:
return "dispatched"
case TaskStateProved:
return "proved"
case TaskStateOutputted:
Expand Down

0 comments on commit 9131934

Please sign in to comment.