forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
compression.go
60 lines (48 loc) · 1.33 KB
/
compression.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package kafka
import (
"errors"
"io"
"sync"
)
const (
CompressionNoneCode = 0
compressionCodecMask = 0x07
)
var (
errUnknownCodec = errors.New("the compression code is invalid or its codec has not been imported")
codecs = make(map[int8]CompressionCodec)
codecsMutex sync.RWMutex
)
// RegisterCompressionCodec registers a compression codec so it can be used by a Writer.
func RegisterCompressionCodec(codec CompressionCodec) {
code := codec.Code()
codecsMutex.Lock()
codecs[code] = codec
codecsMutex.Unlock()
}
// resolveCodec looks up a codec by Code()
func resolveCodec(code int8) (codec CompressionCodec, err error) {
codecsMutex.RLock()
codec = codecs[code]
codecsMutex.RUnlock()
if codec == nil {
err = errUnknownCodec
}
return
}
// CompressionCodec represents a compression codec to encode and decode
// the messages.
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
//
// A CompressionCodec must be safe for concurrent access by multiple go
// routines.
type CompressionCodec interface {
// Code returns the compression codec code
Code() int8
// Human-readable name for the codec.
Name() string
// Constructs a new reader which decompresses data from r.
NewReader(r io.Reader) io.ReadCloser
// Constructs a new writer which writes compressed data to w.
NewWriter(w io.Writer) io.WriteCloser
}