Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: allow hot swapping the config #192

Merged
merged 64 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
cf9b7d2
Refactor how we create listeners.
sbruens Jul 11, 2024
ae7f41d
Update comments.
sbruens Jul 11, 2024
120db8e
`go mod tidy`.
sbruens Jul 11, 2024
d705603
refactor: don't link the TCP handler to a specific listener
sbruens Jul 16, 2024
2fb4a6b
Merge branch 'sbruens/remove-listener-dependency' into sbruens/shared…
sbruens Jul 16, 2024
d2ef46e
Protect new cipher handling methods with mutex.
sbruens Jul 16, 2024
ab07400
Move `listeners.go` under `/service`.
sbruens Jul 16, 2024
71d7140
Use callback instead of passing in key and manager.
sbruens Jul 16, 2024
9dfa4e2
Move config start into a go routine for easier cleanup.
sbruens Jul 16, 2024
0a63f5c
Make a `StreamListener` type.
sbruens Jul 19, 2024
f018d17
Rename `closeFunc` to `onCloseFunc`.
sbruens Jul 19, 2024
4295c45
Rename `globalListener`.
sbruens Jul 19, 2024
e6963f6
Don't track usage in the shared listeners.
sbruens Jul 19, 2024
7113f02
Add `getAddr()` to avoid some duplicate code.
sbruens Jul 19, 2024
e4d679f
Move listener set creation out of the inner function.
sbruens Jul 22, 2024
be5f9b0
Remove `PushBack()` from `CipherList`.
sbruens Jul 22, 2024
343e412
Move listener set to `main.go`.
sbruens Jul 22, 2024
7f86ff1
Close the accept channel with an atomic value.
sbruens Jul 22, 2024
e80b2c5
Update comment.
sbruens Jul 22, 2024
b1428ed
Address review comments.
sbruens Jul 22, 2024
1c16de8
Close before deleting key.
sbruens Jul 22, 2024
ebc7053
`server.Stop()` does not return a value
sbruens Jul 22, 2024
67fc7fb
Add a comment for `StreamListener`.
sbruens Jul 22, 2024
7a15e7d
Do not delete the listener from the manager until the last user has c…
sbruens Jul 22, 2024
499829e
Consolidate usage counting inside a `listenAddress` type.
sbruens Jul 22, 2024
f165dbd
Remove `atomic.Value`.
sbruens Jul 22, 2024
2a2420a
Add some missing comments.
sbruens Jul 22, 2024
8178d78
Merge branch 'master' into sbruens/shared-listeners
sbruens Jul 25, 2024
cccba1a
address review comments
sbruens Jul 25, 2024
da4ccaa
Add type guard for `sharedListener`.
sbruens Jul 25, 2024
d47f612
Stop the existing config in a goroutine.
sbruens Jul 25, 2024
a928e2c
Add a TODO to wait for all handlers to be stopped.
sbruens Jul 25, 2024
98cc3a0
Run `stopConfig` in a goroutine in `Stop()` as well.
sbruens Jul 25, 2024
48d0931
Create a `TCPListener` that implements a `StreamListener`.
sbruens Jul 25, 2024
2dec847
Track close functions instead of the entire listener, which is not ne…
sbruens Jul 25, 2024
ab22e47
Delegate usage tracking to a reference counter.
sbruens Jul 30, 2024
3c2a3ef
Remove the `Get()` method from `refCount`.
sbruens Jul 31, 2024
5e282f1
Return immediately.
sbruens Jul 31, 2024
547e9e6
Rename `shared` to `virtual` as they are not actually shared.
sbruens Jul 31, 2024
c6774c8
Simplify `listenAddr`.
sbruens Jul 31, 2024
df2f9d0
Fix use of the ref count.
sbruens Jul 31, 2024
c678372
Add simple test case for early closing of stream listener.
sbruens Jul 31, 2024
e41abab
Add tests for creating stream listeners.
sbruens Jul 31, 2024
6b11f4f
Refactor create methods.
sbruens Aug 2, 2024
fe8bbdd
Address review comments.
sbruens Aug 5, 2024
36a0a1d
Use a mutex to ensure another user doesn't acquire a new closer while…
sbruens Aug 5, 2024
aeb2652
Move mutex up.
sbruens Aug 6, 2024
8873b10
Manage the ref counting next to the listener creation.
sbruens Aug 6, 2024
899d13d
Do the lazy initialization inside an anonymous function.
sbruens Aug 6, 2024
80e5d49
Fix concurrent access to `acceptCh` and `closeCh`.
sbruens Aug 7, 2024
aa00f2e
Use `/` in key instead of `-`.
sbruens Aug 7, 2024
e658b90
Return error from stopping listeners.
sbruens Aug 7, 2024
fede4d8
Use channels to ensure `virtualPacketConn`s get closed.
sbruens Aug 7, 2024
4730d74
Add more test cases for packet listeners.
sbruens Aug 7, 2024
458cf41
Only log errors from stopping old configs.
sbruens Aug 9, 2024
81bf20e
Remove the `closed` field from the virtual listeners.
sbruens Aug 9, 2024
53b1e96
Remove the `RefCount`.
sbruens Aug 9, 2024
8f9f1ea
Implement channel-based packet read for virtual connections.
sbruens Aug 9, 2024
1ac265d
Use a done channel.
sbruens Aug 9, 2024
1538a9a
Set listeners and `onCloseFunc`'s to nil when closing.
sbruens Aug 14, 2024
4df0b9f
Set `onCloseFunc`'s to nil when closing.
sbruens Aug 14, 2024
16feaf9
Fix race condition.
sbruens Aug 14, 2024
288b88b
Add some benchmarks for listener manager.
sbruens Aug 14, 2024
d688dd9
Merge branch 'master' into sbruens/shared-listeners
sbruens Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 157 additions & 82 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -55,129 +57,202 @@ func init() {
)
}

type ssPort struct {
tcpListener *net.TCPListener
packetConn net.PacketConn
cipherList service.CipherList
}

type SSServer struct {
stopConfig func() error
lnManager service.ListenerManager
natTimeout time.Duration
m *outlineMetrics
replayCache service.ReplayCache
ports map[int]*ssPort
}

func (s *SSServer) startPort(portNum int) error {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{Port: portNum})
func (s *SSServer) loadConfig(filename string) error {
config, err := readConfig(filename)
if err != nil {
//lint:ignore ST1005 Shadowsocks is capitalized.
return fmt.Errorf("Shadowsocks TCP service failed to start on port %v: %w", portNum, err)
return fmt.Errorf("failed to load config (%v): %w", filename, err)
}
slog.Info("Shadowsocks TCP service started.", "address", listener.Addr().String())
packetConn, err := net.ListenUDP("udp", &net.UDPAddr{Port: portNum})
// We hot swap the config by having the old and new listeners both live at
// the same time. This means we create listeners for the new config first,
// and then close the old ones after.
stopConfig, err := s.runConfig(*config)
if err != nil {
//lint:ignore ST1005 Shadowsocks is capitalized.
return fmt.Errorf("Shadowsocks UDP service failed to start on port %v: %w", portNum, err)
return err
}
slog.Info("Shadowsocks UDP service started.", "address", packetConn.LocalAddr().String())
port := &ssPort{tcpListener: listener, packetConn: packetConn, cipherList: service.NewCipherList()}
authFunc := service.NewShadowsocksStreamAuthenticator(port.cipherList, &s.replayCache, s.m)
// TODO: Register initial data metrics at zero.
tcpHandler := service.NewTCPHandler(authFunc, s.m, tcpReadTimeout)
packetHandler := service.NewPacketHandler(s.natTimeout, port.cipherList, s.m)
s.ports[portNum] = port
go service.StreamServe(service.WrapStreamListener(listener.AcceptTCP), tcpHandler.Handle)
go packetHandler.Handle(port.packetConn)
if err := s.Stop(); err != nil {
slog.Warn("Failed to stop old config.", "err", err)
}
s.stopConfig = stopConfig
return nil
}

func (s *SSServer) removePort(portNum int) error {
port, ok := s.ports[portNum]
if !ok {
return fmt.Errorf("port %v doesn't exist", portNum)
}
tcpErr := port.tcpListener.Close()
udpErr := port.packetConn.Close()
delete(s.ports, portNum)
if tcpErr != nil {
//lint:ignore ST1005 Shadowsocks is capitalized.
return fmt.Errorf("Shadowsocks TCP service on port %v failed to stop: %w", portNum, tcpErr)
func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList) service.StreamHandler {
authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m)
// TODO: Register initial data metrics at zero.
return service.NewStreamHandler(authFunc, s.m, tcpReadTimeout)
}

func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList) service.PacketHandler {
return service.NewPacketHandler(s.natTimeout, ciphers, s.m)
}

type listenerSet struct {
manager service.ListenerManager
listenerCloseFuncs map[string]func() error
listenersMu sync.Mutex
}

// ListenStream announces on a given network address. Trying to listen for stream connections
// on the same address twice will result in an error.
func (ls *listenerSet) ListenStream(addr string) (service.StreamListener, error) {
ls.listenersMu.Lock()
defer ls.listenersMu.Unlock()

lnKey := "stream/" + addr
if _, exists := ls.listenerCloseFuncs[lnKey]; exists {
return nil, fmt.Errorf("stream listener for %s already exists", addr)
}
slog.Info("Shadowsocks TCP service stopped.", "port", portNum)
if udpErr != nil {
//lint:ignore ST1005 Shadowsocks is capitalized.
return fmt.Errorf("Shadowsocks UDP service on port %v failed to stop: %w", portNum, udpErr)
ln, err := ls.manager.ListenStream(addr)
if err != nil {
return nil, err
}
slog.Info("Shadowsocks UDP service stopped.", "port", portNum)
return nil
ls.listenerCloseFuncs[lnKey] = ln.Close
return ln, nil
}

func (s *SSServer) loadConfig(filename string) error {
config, err := readConfig(filename)
// ListenPacket announces on a given network address. Trying to listen for packet connections
// on the same address twice will result in an error.
func (ls *listenerSet) ListenPacket(addr string) (net.PacketConn, error) {
ls.listenersMu.Lock()
defer ls.listenersMu.Unlock()

lnKey := "packet/" + addr
if _, exists := ls.listenerCloseFuncs[lnKey]; exists {
return nil, fmt.Errorf("packet listener for %s already exists", addr)
}
ln, err := ls.manager.ListenPacket(addr)
if err != nil {
return fmt.Errorf("failed to load config (%v): %w", filename, err)
return nil, err
}
ls.listenerCloseFuncs[lnKey] = ln.Close
return ln, nil
}

portChanges := make(map[int]int)
portCiphers := make(map[int]*list.List) // Values are *List of *CipherEntry.
for _, keyConfig := range config.Keys {
portChanges[keyConfig.Port] = 1
cipherList, ok := portCiphers[keyConfig.Port]
if !ok {
cipherList = list.New()
portCiphers[keyConfig.Port] = cipherList
}
cryptoKey, err := shadowsocks.NewEncryptionKey(keyConfig.Cipher, keyConfig.Secret)
if err != nil {
return fmt.Errorf("failed to create encyption key for key %v: %w", keyConfig.ID, err)
// Close closes all the listeners in the set, after which the set can't be used again.
func (ls *listenerSet) Close() error {
ls.listenersMu.Lock()
defer ls.listenersMu.Unlock()

for addr, listenerCloseFunc := range ls.listenerCloseFuncs {
if err := listenerCloseFunc(); err != nil {
return fmt.Errorf("listener on address %s failed to stop: %w", addr, err)
}
entry := service.MakeCipherEntry(keyConfig.ID, cryptoKey, keyConfig.Secret)
cipherList.PushBack(&entry)
}
for port := range s.ports {
portChanges[port] = portChanges[port] - 1
}
for portNum, count := range portChanges {
if count == -1 {
if err := s.removePort(portNum); err != nil {
return fmt.Errorf("failed to remove port %v: %w", portNum, err)
ls.listenerCloseFuncs = nil
return nil
}

// Len returns the number of listeners in the set.
func (ls *listenerSet) Len() int {
return len(ls.listenerCloseFuncs)
}

func (s *SSServer) runConfig(config Config) (func() error, error) {
startErrCh := make(chan error)
stopErrCh := make(chan error)
stopCh := make(chan struct{})

go func() {
lnSet := &listenerSet{
manager: s.lnManager,
listenerCloseFuncs: make(map[string]func() error),
}
defer func() {
stopErrCh <- lnSet.Close()
}()

startErrCh <- func() error {
portCiphers := make(map[int]*list.List) // Values are *List of *CipherEntry.
for _, keyConfig := range config.Keys {
cipherList, ok := portCiphers[keyConfig.Port]
if !ok {
cipherList = list.New()
portCiphers[keyConfig.Port] = cipherList
}
cryptoKey, err := shadowsocks.NewEncryptionKey(keyConfig.Cipher, keyConfig.Secret)
if err != nil {
return fmt.Errorf("failed to create encyption key for key %v: %w", keyConfig.ID, err)
}
entry := service.MakeCipherEntry(keyConfig.ID, cryptoKey, keyConfig.Secret)
cipherList.PushBack(&entry)
}
} else if count == +1 {
if err := s.startPort(portNum); err != nil {
return err
for portNum, cipherList := range portCiphers {
addr := net.JoinHostPort("::", strconv.Itoa(portNum))

ciphers := service.NewCipherList()
ciphers.Update(cipherList)

sh := s.NewShadowsocksStreamHandler(ciphers)
ln, err := lnSet.ListenStream(addr)
if err != nil {
return err
}
slog.Info("Shadowsocks TCP service started.", "address", ln.Addr().String())
go service.StreamServe(ln.AcceptStream, sh.Handle)

pc, err := lnSet.ListenPacket(addr)
if err != nil {
return err
}
slog.Info("Shadowsocks UDP service started.", "address", pc.LocalAddr().String())
ph := s.NewShadowsocksPacketHandler(ciphers)
go ph.Handle(pc)
}
}
}
for portNum, cipherList := range portCiphers {
s.ports[portNum].cipherList.Update(cipherList)
slog.Info("Loaded config.", "access keys", len(config.Keys), "listeners", lnSet.Len())
s.m.SetNumAccessKeys(len(config.Keys), lnSet.Len())
return nil
}()

<-stopCh
}()

err := <-startErrCh
if err != nil {
return nil, err
}
slog.Info("Loaded config.", "access keys", len(config.Keys), "ports", len(s.ports))
s.m.SetNumAccessKeys(len(config.Keys), len(portCiphers))
return nil
return func() error {
slog.Info("Stopping running config.")
// TODO(sbruens): Actually wait for all handlers to be stopped, e.g. by
// using a https://pkg.go.dev/sync#WaitGroup.
stopCh <- struct{}{}
stopErr := <-stopErrCh
return stopErr
}, nil
}

// Stop serving on all ports.
// Stop stops serving the current config.
func (s *SSServer) Stop() error {
for portNum := range s.ports {
if err := s.removePort(portNum); err != nil {
return err
}
stopFunc := s.stopConfig
if stopFunc == nil {
return nil
}
if err := stopFunc(); err != nil {
slog.Error("Error stopping config.", "err", err)
return err
}
slog.Info("Stopped all listeners for running config.")
return nil
}

// RunSSServer starts a shadowsocks server running, and returns the server or an error.
func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetrics, replayHistory int) (*SSServer, error) {
server := &SSServer{
lnManager: service.NewListenerManager(),
natTimeout: natTimeout,
m: sm,
replayCache: service.NewReplayCache(replayHistory),
ports: make(map[int]*ssPort),
}
err := server.loadConfig(filename)
if err != nil {
return nil, fmt.Errorf("failed configure server: %w", err)
return nil, fmt.Errorf("failed to configure server: %w", err)
}
sigHup := make(chan os.Signal, 1)
signal.Notify(sigHup, syscall.SIGHUP)
Expand Down
14 changes: 7 additions & 7 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestTCPEcho(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
handler := service.NewTCPHandler(authFunc, testMetrics, testTimeout)
handler := service.NewStreamHandler(authFunc, testMetrics, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -202,10 +202,10 @@ func TestRestrictedAddresses(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := service.NewTCPHandler(authFunc, testMetrics, testTimeout)
handler := service.NewStreamHandler(authFunc, testMetrics, testTimeout)
done := make(chan struct{})
go func() {
service.StreamServe(service.WrapStreamListener(proxyListener.AcceptTCP), handler.Handle)
service.StreamServe(service.WrapStreamAcceptFunc(proxyListener.AcceptTCP), handler.Handle)
done <- struct{}{}
}()

Expand Down Expand Up @@ -384,11 +384,11 @@ func BenchmarkTCPThroughput(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics)
handler := service.NewTCPHandler(authFunc, testMetrics, testTimeout)
handler := service.NewStreamHandler(authFunc, testMetrics, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(service.WrapStreamListener(proxyListener.AcceptTCP), handler.Handle)
service.StreamServe(service.WrapStreamAcceptFunc(proxyListener.AcceptTCP), handler.Handle)
done <- struct{}{}
}()

Expand Down Expand Up @@ -448,11 +448,11 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics)
handler := service.NewTCPHandler(authFunc, testMetrics, testTimeout)
handler := service.NewStreamHandler(authFunc, testMetrics, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(service.WrapStreamListener(proxyListener.AcceptTCP), handler.Handle)
service.StreamServe(service.WrapStreamAcceptFunc(proxyListener.AcceptTCP), handler.Handle)
done <- struct{}{}
}()

Expand Down
Loading
Loading