-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathoperation.go
119 lines (106 loc) · 2.98 KB
/
operation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package oplog
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"time"
"gopkg.in/mgo.v2/bson"
)
// Operation represents an operation stored in the OpLog, ready to be exposed as SSE.
type Operation struct {
ID *bson.ObjectId `bson:"_id,omitempty"`
Event string `bson:"event"`
Data *OperationData `bson:"data"`
}
// OperationData is the data part of the SSE event for the operation.
type OperationData struct {
Timestamp time.Time `bson:"ts" json:"timestamp"`
Parents []string `bson:"p" json:"parents"`
Type string `bson:"t" json:"type"`
ID string `bson:"id" json:"id"`
Ref string `bson:"-,omitempty" json:"ref,omitempty"`
}
// NewOperation creates an new operation from given information.
//
// The event argument can be one of "insert", "update" or "delete". The time
// defines the exact modification date of the object (must be the exact same time
// as stored in the database).
func NewOperation(event string, time time.Time, objID, objType string, objParents []string) *Operation {
id := bson.NewObjectId()
return &Operation{
ID: &id,
Event: event,
Data: &OperationData{
Timestamp: time,
ID: objID,
Type: objType,
Parents: objParents,
},
}
}
// GetEventID returns an SSE last event id for the operation
func (op Operation) GetEventID() LastID {
return &OperationLastID{op.ID}
}
// Validate ensures an operation has the proper syntax
func (op Operation) Validate() error {
switch op.Event {
case "insert", "update", "delete":
default:
return fmt.Errorf("invalid event name: %s", op.Event)
}
return op.Data.Validate()
}
// WriteTo serializes an Operation as a SSE compatible message
func (op Operation) WriteTo(w io.Writer) (int64, error) {
data, err := json.Marshal(op.Data)
if err != nil {
return 0, err
}
n, err := fmt.Fprintf(w, "id: %s\nevent: %s\ndata: %s\n\n", op.ID.Hex(), op.Event, data)
return int64(n), err
}
// Info returns a human readable version of the operation
func (op *Operation) Info() string {
id := "(new)"
if op.ID != nil {
id = op.ID.Hex()
}
return fmt.Sprintf("%s:%s(%s:%s)", id, op.Event, op.Data.Type, op.Data.ID)
}
// genRef generates the reference URL (Ref field) from the given object URL template based on
// the Type and Id fields.
func (obd *OperationData) genRef(objectURL string) {
if objectURL == "" {
obd.Ref = ""
return
}
r := strings.NewReplacer("{{type}}", obd.Type, "{{id}}", obd.ID)
obd.Ref = r.Replace(objectURL)
}
// GetID returns the operation id
func (obd OperationData) GetID() string {
b := bytes.Buffer{}
b.WriteString(obd.Type)
b.WriteString("/")
b.WriteString(obd.ID)
return b.String()
}
// Validate ensures an operation data has the right syntax
func (obd OperationData) Validate() error {
if obd.ID == "" {
return errors.New("missing id field")
}
if obd.Type == "" {
return errors.New("missing type field")
}
for _, parent := range obd.Parents {
if parent == "" {
return errors.New("parent can't be empty")
}
}
return nil
}