-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathcolumnarmux.go
118 lines (93 loc) · 2.45 KB
/
columnarmux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package gocbcore
import (
"bytes"
"fmt"
"sync/atomic"
"unsafe"
)
type columnarMux struct {
muxPtr unsafe.Pointer
cfgMgr configManager
}
func newColumnarMux(cfgMgr configManager, muxState *columnarClientMux) *columnarMux {
mux := &columnarMux{
cfgMgr: cfgMgr,
muxPtr: unsafe.Pointer(muxState),
}
cfgMgr.AddConfigWatcher(mux)
return mux
}
func (mux *columnarMux) Get() *columnarClientMux {
return (*columnarClientMux)(atomic.LoadPointer(&mux.muxPtr))
}
func (mux *columnarMux) Update(old, newMux *columnarClientMux) bool {
if newMux == nil {
logErrorf("Attempted to update to nil columnarClientMux")
return false
}
if old != nil {
return atomic.CompareAndSwapPointer(&mux.muxPtr, unsafe.Pointer(old), unsafe.Pointer(newMux))
}
if atomic.SwapPointer(&mux.muxPtr, unsafe.Pointer(newMux)) != nil {
logErrorf("Updated from nil attempted on initialized columnarClientMux")
return false
}
return true
}
func (mux *columnarMux) Clear() *columnarClientMux {
val := atomic.SwapPointer(&mux.muxPtr, nil)
return (*columnarClientMux)(val)
}
func (mux *columnarMux) OnNewRouteConfig(cfg *routeConfig) {
oldHTTPMux := mux.Get()
if oldHTTPMux == nil {
logWarnf("HTTP mux received new route config after shutdown")
return
}
var endpoints []routeEndpoint
if oldHTTPMux.tlsConfig != nil {
endpoints = cfg.cbasEpList.SSLEndpoints
} else {
endpoints = cfg.cbasEpList.NonSSLEndpoints
}
var buffer bytes.Buffer
addEps := func(title string, eps []routeEndpoint) {
fmt.Fprintf(&buffer, "%s Eps:\n", title)
for _, ep := range eps {
fmt.Fprintf(&buffer, " - %s\n", ep.Address)
}
}
buffer.WriteString(fmt.Sprintln("Columnar muxer applying endpoints:"))
addEps("Columnar", endpoints)
logDebugf(buffer.String())
newColumnarMux := newColumnarClientMux(cfg, endpoints, oldHTTPMux.tlsConfig, oldHTTPMux.auth)
if !mux.Update(oldHTTPMux, newColumnarMux) {
logDebugf("Failed to update columnar mux")
}
}
func (mux *columnarMux) ColumnarEps() []routeEndpoint {
clientMux := mux.Get()
if clientMux == nil {
return nil
}
return clientMux.epList
}
func (mux *columnarMux) ConfigRev() (int64, error) {
clientMux := mux.Get()
if clientMux == nil {
return 0, errShutdown
}
return clientMux.revID, nil
}
func (mux *columnarMux) Close() error {
mux.cfgMgr.RemoveConfigWatcher(mux)
mux.Clear()
return nil
}
func (mux *columnarMux) Auth() AuthProvider {
clientMux := mux.Get()
if clientMux == nil {
return nil
}
return clientMux.auth
}