-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcodec.go
159 lines (128 loc) · 3.22 KB
/
codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package mux
import (
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"io/ioutil"
"sync"
"bufio"
)
type codec interface {
// Run serially. Reads a single frame from the wire
decodeFrame(in io.Reader) (Frame, error)
// May be run in parallel. The types of Messages in frames are determined by the protocol version
encodeFrame(out *bufio.Writer, frame *Frame) error
}
// Base codec implementation. Doesn't support message fragmenting
type baseCodec struct {
writeLock sync.Mutex
}
// simple decoder function
func (c *baseCodec) decodeFrame(in io.Reader) (Frame, error) {
return DecodeFrame(in)
}
// simple encoder function
func (c *baseCodec) encodeFrame(out *bufio.Writer, frame *Frame) (err error) {
c.writeLock.Lock()
defer c.writeLock.Unlock()
err = EncodeFrame(out, frame)
if err != nil {
return
}
// Flush the frame
return out.Flush()
}
func isFrameFragment(streamId int32) bool {
return (streamId & FragmentMask) != 0
}
func encodeBytesInt16(writer io.Writer, bytes []byte) (err error) {
length := len(bytes)
if length > math.MaxInt16 {
return errors.New(fmt.Sprintf("Context field overflow. Length: %d", length))
}
i16len := int16(length)
err = binary.Write(writer, binary.BigEndian, &i16len)
if err != nil {
return
}
_, err = writer.Write(bytes)
return
}
func decodeFrame(in io.Reader, size int32) (frame Frame, err error) {
limitReader := io.LimitReader(in, int64(size))
// Read the header of the frame
var header int32
err = binary.Read(limitReader, binary.BigEndian, &header)
if err != nil {
return
}
frameTpe := int8((header >> 24) & 0xff) // most significant byte is the type
frame.streamId = MaxStreamId & header
// subtract 4 bytes for the header
frame.message, err = decodeStandardFrame(limitReader, frameTpe, size - 4)
return
}
// Expects the reader to signal EOF at the end of the frame
func decodeStandardFrame(in io.Reader, tpe int8, size int32) (msg Message, err error) {
switch tpe {
// TODO: fragments are handled incorrectly
case TdispatchTpe:
msg, err = decodeTdispatch(in, size)
case RdispatchTpe:
msg, err = decodeRdispatch(in, size)
case TpingTpe:
msg = &Tping{}
case RpingTpe:
msg = &Rping{}
case TinitTpe:
var headers []Header
var version int16
version, headers, err = decodeInit(in)
msg = &Tinit{
version: version,
headers: headers,
}
case RinitTpe:
var headers []Header
var version int16
version, headers, err = decodeInit(in)
msg = &Rinit{
version: version,
headers: headers,
}
case RerrTpe:
fallthrough
case BadRerrTpe:
var bytes []byte
bytes, err = ioutil.ReadAll(in)
msg = &Rerr{
error: string(bytes),
}
default:
err = errors.New(fmt.Sprintf("Found invalid frame type: %d", tpe))
return
}
return
}
func readInt32Slice(input io.Reader) (data []byte, err error) {
var fieldLen int32
err = binary.Read(input, binary.BigEndian, &fieldLen)
if err != nil {
return
}
data = make([]byte, int(fieldLen), int(fieldLen))
_, err = io.ReadFull(input, data)
return
}
func decodeInt16Bytes(input io.Reader) ([]byte, error) {
var len int16
err := binary.Read(input, binary.BigEndian, len)
if err != nil {
return []byte{}, err
}
bytes := make([]byte, len)
_, err = io.ReadFull(input, bytes)
return bytes, err
}