From bce3f019057e9b07641d2166f58c45e85aa03ef9 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Sun, 22 Nov 2020 17:54:56 +0800 Subject: [PATCH] revert commit 2321e9139d28d56cc4009e63e67aa69b621faca9 --- common/observable/subscriber.go | 12 +++++++----- go.mod | 2 ++ go.sum | 4 ++++ tunnel/tunnel.go | 20 ++++++++++++-------- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/common/observable/subscriber.go b/common/observable/subscriber.go index cb2a70f42e..3fb1e587d2 100644 --- a/common/observable/subscriber.go +++ b/common/observable/subscriber.go @@ -2,32 +2,34 @@ package observable import ( "sync" + + "gopkg.in/eapache/channels.v1" ) type Subscription <-chan interface{} type Subscriber struct { - buffer chan interface{} + buffer *channels.InfiniteChannel once sync.Once } func (s *Subscriber) Emit(item interface{}) { - s.buffer <- item + s.buffer.In() <- item } func (s *Subscriber) Out() Subscription { - return s.buffer + return s.buffer.Out() } func (s *Subscriber) Close() { s.once.Do(func() { - close(s.buffer) + s.buffer.Close() }) } func newSubscriber() *Subscriber { sub := &Subscriber{ - buffer: make(chan interface{}, 200), + buffer: channels.NewInfiniteChannel(), } return sub } diff --git a/go.mod b/go.mod index 69f20dca1c..b2526549bb 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/Dreamacro/go-shadowsocks2 v0.1.6 + github.com/eapache/queue v1.1.0 // indirect github.com/go-chi/chi v4.1.2+incompatible github.com/go-chi/cors v1.1.1 github.com/go-chi/render v1.0.1 @@ -18,6 +19,7 @@ require ( golang.org/x/net v0.0.0-20201110031124-69a78807bb2b golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 + gopkg.in/eapache/channels.v1 v1.1.0 gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index 6e07fb3d16..097c202c6f 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSi github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/cors v1.1.1 h1:eHuqxsIw89iXcWnWUN8R72JMibABJTN/4IOYI5WERvw= @@ -66,6 +68,8 @@ golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapK golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/eapache/channels.v1 v1.1.0 h1:5bGAyKKvyCTWjSj7mhefG6Lc68VyN4MH1v8/7OoeeB4= +gopkg.in/eapache/channels.v1 v1.1.0/go.mod h1:BHIBujSvu9yMTrTYbTCjDD43gUhtmaOtTWDe7sTv1js= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 8678d47c7a..999b4b07ac 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -13,11 +13,13 @@ import ( "github.com/Dreamacro/clash/component/resolver" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/log" + + channels "gopkg.in/eapache/channels.v1" ) var ( - tcpQueue = make(chan C.ServerAdapter, 200) - udpQueue = make(chan *inbound.PacketAdapter, 200) + tcpQueue = channels.NewInfiniteChannel() + udpQueue = channels.NewInfiniteChannel() natTable = nat.New() rules []C.Rule proxies = make(map[string]C.Proxy) @@ -37,13 +39,13 @@ func init() { // Add request to queue func Add(req C.ServerAdapter) { - tcpQueue <- req + tcpQueue.In() <- req } // AddPacket add udp Packet to queue func AddPacket(packet *inbound.PacketAdapter) { select { - case udpQueue <- packet: + case udpQueue.In() <- packet: default: } } @@ -90,8 +92,9 @@ func SetMode(m TunnelMode) { // processUDP starts a loop to handle udp packet func processUDP() { - queue := udpQueue - for conn := range queue { + queue := udpQueue.Out() + for elm := range queue { + conn := elm.(*inbound.PacketAdapter) handleUDPConn(conn) } } @@ -105,8 +108,9 @@ func process() { go processUDP() } - queue := tcpQueue - for conn := range queue { + queue := tcpQueue.Out() + for elm := range queue { + conn := elm.(C.ServerAdapter) go handleTCPConn(conn) } }