Skip to content

Commit

Permalink
Refactor to make packet handler an association handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Jan 21, 2025
1 parent 5704718 commit 5915e47
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 475 deletions.
12 changes: 3 additions & 9 deletions caddy/shadowsocks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ type ShadowsocksHandler struct {
Keys []KeyConfig `json:"keys,omitempty"`

streamHandler outline.StreamHandler
packetHandler outline.PacketHandler
associationHandler outline.AssociationHandler
metrics outline.ServiceMetrics
tgtListener transport.PacketListener
logger *slog.Logger
}

Expand Down Expand Up @@ -106,13 +105,12 @@ func (h *ShadowsocksHandler) Provision(ctx caddy.Context) error {
ciphers := outline.NewCipherList()
ciphers.Update(cipherList)

h.streamHandler, h.packetHandler = outline.NewShadowsocksHandlers(
h.streamHandler, h.associationHandler = outline.NewShadowsocksHandlers(
outline.WithLogger(h.logger),
outline.WithCiphers(ciphers),
outline.WithMetrics(h.metrics),
outline.WithReplayCache(&app.ReplayCache),
)
h.tgtListener = outline.MakeTargetUDPListener(defaultNatTimeout, 0)
return nil
}

Expand All @@ -122,11 +120,7 @@ func (h *ShadowsocksHandler) Handle(cx *layer4.Connection, _ layer4.Handler) err
case transport.StreamConn:
h.streamHandler.HandleStream(cx.Context, conn, h.metrics.AddOpenTCPConnection(conn))
case net.Conn:
assoc, err := outline.NewPacketAssociation(conn, h.tgtListener, h.metrics.AddOpenUDPAssociation(conn))
if err != nil {
return fmt.Errorf("failed to handle association: %v", err)
}
outline.HandleAssociation(assoc, h.packetHandler.HandlePacket)
h.associationHandler.HandleAssociation(cx.Context, conn, h.metrics.AddOpenUDPAssociation(conn))
default:
return fmt.Errorf("failed to handle unknown connection type: %t", conn)
}
Expand Down
30 changes: 10 additions & 20 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,11 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
ciphers := service.NewCipherList()
ciphers.Update(cipherList)

streamHandler, packetHandler := service.NewShadowsocksHandlers(
streamHandler, associationHandler := service.NewShadowsocksHandlers(
service.WithCiphers(ciphers),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
service.WithPacketListener(service.MakeTargetUDPListener(s.natTimeout, 0)),
service.WithLogger(slog.Default()),
)
ln, err := lnSet.ListenStream(addr)
Expand All @@ -245,27 +246,22 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
tgtListener := service.MakeTargetUDPListener(s.natTimeout, 0)
go service.PacketServe(pc, func(conn net.Conn) (service.PacketAssociation, error) {
m := s.serviceMetrics.AddOpenUDPAssociation(conn)
assoc, err := service.NewPacketAssociation(conn, tgtListener, m)
if err != nil {
return nil, fmt.Errorf("failed to handle association: %v", err)
}
return assoc, nil
}, packetHandler.HandlePacket, s.serverMetrics)
go service.PacketServe(pc, func(ctx context.Context, conn net.Conn) {
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
}, s.serverMetrics)
}

for _, serviceConfig := range config.Services {
ciphers, err := newCipherListFromConfig(serviceConfig)
if err != nil {
return fmt.Errorf("failed to create cipher list from config: %v", err)
}
streamHandler, packetHandler := service.NewShadowsocksHandlers(
streamHandler, associationHandler := service.NewShadowsocksHandlers(
service.WithCiphers(ciphers),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
service.WithStreamDialer(service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, serviceConfig.Dialer.Fwmark)),
service.WithPacketListener(service.MakeTargetUDPListener(s.natTimeout, serviceConfig.Dialer.Fwmark)),
service.WithLogger(slog.Default()),
)
if err != nil {
Expand Down Expand Up @@ -298,15 +294,9 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
}
return serviceConfig.Dialer.Fwmark
}())
tgtListener := service.MakeTargetUDPListener(s.natTimeout, serviceConfig.Dialer.Fwmark)
go service.PacketServe(pc, func(conn net.Conn) (service.PacketAssociation, error) {
m := s.serviceMetrics.AddOpenUDPAssociation(conn)
assoc, err := service.NewPacketAssociation(conn, tgtListener, m)
if err != nil {
return nil, fmt.Errorf("failed to handle association: %v", err)
}
return assoc, nil
}, packetHandler.HandlePacket, s.serverMetrics)
go service.PacketServe(pc, func(ctx context.Context, conn net.Conn) {
associationHandler.HandleAssociation(ctx, conn, s.serviceMetrics.AddOpenUDPAssociation(conn))
}, s.serverMetrics)
}
}
totalCipherCount += len(serviceConfig.Keys)
Expand Down
27 changes: 12 additions & 15 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,14 @@ func TestUDPEcho(t *testing.T) {
if err != nil {
t.Fatal(err)
}
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
proxy := service.NewAssociationHandler(cipherList, &fakeShadowsocksMetrics{})

proxy.SetTargetIPValidator(allowAll)
natMetrics := &natTestMetrics{}
associationMetrics := &fakeUDPAssociationMetrics{}
go service.PacketServe(proxyConn, func(conn net.Conn) (service.PacketAssociation, error) {
assoc, _ := service.NewPacketAssociation(conn, &transport.UDPListener{Address: ""}, associationMetrics)
return assoc, nil
}, proxy.Handle, natMetrics)
go service.PacketServe(proxyConn, func(ctx context.Context, conn net.Conn) {
proxy.HandleAssociation(ctx, conn, associationMetrics)
}, natMetrics)

cryptoKey, err := shadowsocks.NewEncryptionKey(shadowsocks.CHACHA20IETFPOLY1305, secrets[0])
require.NoError(t, err)
Expand Down Expand Up @@ -546,14 +545,13 @@ func BenchmarkUDPEcho(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
proxy := service.NewAssociationHandler(cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
service.PacketServe(server, func(conn net.Conn) (service.PacketAssociation, error) {
assoc, _ := service.NewPacketAssociation(conn, &transport.UDPListener{Address: ""}, nil)
return assoc, nil
}, proxy.Handle, &natTestMetrics{})
service.PacketServe(server, func(ctx context.Context, conn net.Conn) {
proxy.HandleAssociation(ctx, conn, &fakeUDPAssociationMetrics{})
}, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down Expand Up @@ -593,14 +591,13 @@ func BenchmarkUDPManyKeys(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
proxy := service.NewAssociationHandler(cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
service.PacketServe(proxyConn, func(conn net.Conn) (service.PacketAssociation, error) {
assoc, _ := service.NewPacketAssociation(conn, &transport.UDPListener{Address: ""}, nil)
return assoc, nil
}, proxy.Handle, &natTestMetrics{})
service.PacketServe(proxyConn, func(ctx context.Context, conn net.Conn) {
proxy.HandleAssociation(ctx, conn, &fakeUDPAssociationMetrics{})
}, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down
27 changes: 18 additions & 9 deletions service/shadowsocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
onet "github.com/Jigsaw-Code/outline-ss-server/net"
)

const (
// 59 seconds is most common timeout for servers that do not respond to invalid requests
tcpReadTimeout time.Duration = 59 * time.Second
)
// 59 seconds is most common timeout for servers that do not respond to invalid requests
const tcpReadTimeout time.Duration = 59 * time.Second

// ShadowsocksConnMetrics is used to report Shadowsocks related metrics on connections.
type ShadowsocksConnMetrics interface {
Expand All @@ -51,11 +49,12 @@ type ssService struct {
targetIPValidator onet.TargetIPValidator
replayCache *ReplayCache

streamDialer transport.StreamDialer
streamDialer transport.StreamDialer
packetListener transport.PacketListener
}

// NewShadowsocksHandlers creates new Shadowsocks stream and packet handlers.
func NewShadowsocksHandlers(opts ...Option) (StreamHandler, PacketHandler) {
func NewShadowsocksHandlers(opts ...Option) (StreamHandler, AssociationHandler) {
s := &ssService{
logger: noopLogger(),
}
Expand All @@ -74,10 +73,13 @@ func NewShadowsocksHandlers(opts ...Option) (StreamHandler, PacketHandler) {
}
sh.SetLogger(s.logger)

ph := NewPacketHandler(s.ciphers, &ssConnMetrics{s.metrics.AddUDPCipherSearch})
ph.SetLogger(s.logger)
ah := NewAssociationHandler(s.ciphers, &ssConnMetrics{s.metrics.AddUDPCipherSearch})
if s.packetListener != nil {
ah.SetTargetPacketListener(s.packetListener)
}
ah.SetLogger(s.logger)

return sh, ph
return sh, ah
}

// WithLogger can be used to provide a custom log target. If not provided,
Expand Down Expand Up @@ -115,6 +117,13 @@ func WithStreamDialer(dialer transport.StreamDialer) Option {
}
}

// WithPacketListener option function.
func WithPacketListener(listener transport.PacketListener) Option {
return func(s *ssService) {
s.packetListener = listener
}
}

type ssConnMetrics struct {
metricFunc func(accessKeyFound bool, timeToCipher time.Duration)
}
Expand Down
8 changes: 4 additions & 4 deletions service/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func TestProbeClientBytesBasicTruncated(t *testing.T) {
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
handler.SetTargetDialerStream(MakeValidatingTCPStreamDialer(allowAll, 0))
handler.SetTargetDialer(MakeValidatingTCPStreamDialer(allowAll, 0))
done := make(chan struct{})
go func() {
StreamServe(
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestProbeClientBytesBasicModified(t *testing.T) {
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
handler.SetTargetDialerStream(MakeValidatingTCPStreamDialer(allowAll, 0))
handler.SetTargetDialer(MakeValidatingTCPStreamDialer(allowAll, 0))
done := make(chan struct{})
go func() {
StreamServe(
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestProbeClientBytesCoalescedModified(t *testing.T) {
testMetrics := &probeTestMetrics{}
authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}, nil)
handler := NewStreamHandler(authFunc, 200*time.Millisecond)
handler.SetTargetDialerStream(MakeValidatingTCPStreamDialer(allowAll, 0))
handler.SetTargetDialer(MakeValidatingTCPStreamDialer(allowAll, 0))
done := make(chan struct{})
go func() {
StreamServe(
Expand Down Expand Up @@ -747,7 +747,7 @@ func TestStreamServeEarlyClose(t *testing.T) {
err = tcpListener.Close()
require.NoError(t, err)
// This should return quickly, without timing out or calling the handler.
StreamServeStream(WrapStreamAcceptFunc(tcpListener.AcceptTCP), nil)
StreamServe(WrapStreamAcceptFunc(tcpListener.AcceptTCP), nil)
}

// Makes sure the TCP listener returns [io.ErrClosed] on Close().
Expand Down
Loading

0 comments on commit 5915e47

Please sign in to comment.