This repository was archived by the owner on Oct 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwriter.go
237 lines (214 loc) · 4.92 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
package recache
import (
"bytes"
"compress/flate"
"crypto/sha1"
"hash"
"hash/adler32"
"io"
)
// Describes a single constituent deflate-compressed frame of a record
type frameDescriptor struct {
checksum uint32 // Adler32 checksum
size uint32 // Uncompressed size
}
// Appending another frameDescriptor onto f
func (f *frameDescriptor) append(rhs frameDescriptor) {
f.size += rhs.size // Allowed to overflow
// Merge Adler32 checksums. Based on adler32_combine() from zlib.
// Copyright (C) 1995-2011, 2016 Mark Adler
const base = uint32(65521) // largest prime smaller than 65536
rem := rhs.size % base
sum1 := f.checksum & 0xffff
sum2 := (rem * sum1) % base
sum1 += (rhs.checksum & 0xffff) + base - 1
sum2 += ((f.checksum >> 16) & 0xffff) +
((rhs.checksum >> 16) & 0xffff) +
base -
rem
if sum1 >= base {
sum1 -= base
}
if sum1 >= base {
sum1 -= base
}
if sum2 >= (base << 1) {
sum2 -= base << 1
}
if sum2 >= base {
sum2 -= base
}
f.checksum = sum1 | (sum2 << 16)
}
// Provides utility methods for building record buffers and recursive record
// trees
type RecordWriter struct {
compressing bool // Currently compressing data into a buffer
cache, frontend int
key Key
compressor *flate.Writer
current struct { // Deflate frame currently being compressed
bytes.Buffer
size uint32
}
hasher hash.Hash32 // Adler32 checksum builder
data componentNode
last *componentNode
}
// Write non-compressed data to the record for storage
func (rw *RecordWriter) Write(p []byte) (n int, err error) {
if !rw.compressing {
// Initialize or reset pipeline state.
// Reuse allocated resources, if possible.
if rw.compressor == nil {
rw.compressor, err = flate.NewWriter(&rw.current, CompressionLevel)
if err != nil {
return
}
rw.hasher = adler32.New()
} else {
rw.current.Reset()
rw.current.size = 0
rw.hasher.Reset()
rw.compressor.Reset(&rw.current)
}
rw.compressing = true
}
n, err = rw.compressor.Write(p)
if err != nil {
return
}
rw.current.size += uint32(n)
_, err = rw.hasher.Write(p)
return
}
// Read non-compressed data from r and write it to the record for storage
func (rw *RecordWriter) ReadFrom(r io.Reader) (n int64, err error) {
var (
m int
arr [512]byte
)
for {
buf := arr[:]
m, err = r.Read(buf)
n += int64(m)
switch err {
case nil:
_, err = rw.Write(buf[:m])
if err != nil {
return
}
case io.EOF:
err = nil
return
default:
return
}
}
}
// Include data from passed frontend by key and bind it to rw.
// The record generated by rw will automatically be evicted from its parent
// cache on eviction of the included record.
func (rw *RecordWriter) Include(f *Frontend, k Key) (err error) {
rec, err := rw.bind(f, k)
if err != nil {
return
}
rw.append(recordReference{
componentCommon: componentCommon{
hash: rec.hash,
},
Record: rec,
})
return
}
func (rw *RecordWriter) bind(f *Frontend, k Key) (rec *Record, err error) {
// Finish any previous buffer writes
err = rw.flush(false)
if err != nil {
return
}
rec, err = f.getGeneratedRecord(k)
if err != nil {
return
}
registerDependance(
intercacheRecordLocation{
cache: rw.cache,
recordLocation: recordLocation{
frontend: rw.frontend,
key: rw.key,
},
},
intercacheRecordLocation{
cache: f.cache.id,
recordLocation: recordLocation{
frontend: f.id,
key: k,
},
},
)
return
}
// Bind to record from passed frontend by key and return the retrieved record.
//
// The record generated by rw will automatically be evicted from its parent
// cache on eviction of the included record.
func (rw *RecordWriter) Bind(f *Frontend, k Key) (*Record, error) {
return rw.bind(f, k)
}
// Bind to record from passed frontend by key and decode it as JSON into dst.
//
// The record generated by rw will automatically be evicted from its parent
// cache on eviction of the included record.
func (rw *RecordWriter) BindJSON(
f *Frontend,
k Key,
dst interface{},
) (err error) {
s, err := rw.Bind(f, k)
if err != nil {
return
}
return s.DecodeJSON(dst)
}
// Flush the current deflate stream, if any.
//
// final: this is the final flush and copying of buffer is not required
func (rw *RecordWriter) flush(final bool) (err error) {
if rw.compressing {
err = rw.compressor.Flush()
if err != nil {
return
}
var buf buffer
if final {
buf.data = rw.current.Bytes()
} else {
buf.data = make([]byte, rw.current.Len())
copy(buf.data, rw.current.Bytes())
}
buf.hash = sha1.Sum(buf.data)
buf.size = rw.current.size
buf.frameDescriptor.checksum = rw.hasher.Sum32()
rw.append(buf)
rw.compressing = false
}
return
}
// Append new component to linked list
func (rw *RecordWriter) append(c component) {
if rw.last == nil {
// First component
rw.data = componentNode{
component: c,
}
rw.last = &rw.data
} else {
n := &componentNode{
component: c,
}
rw.last.next = n
rw.last = n
}
}