Skip to content

Commit

Permalink
revert commit 2321e91
Browse files Browse the repository at this point in the history
  • Loading branch information
wwqgtxx committed Nov 22, 2020
1 parent 1dc064d commit bce3f01
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 13 deletions.
12 changes: 7 additions & 5 deletions common/observable/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@ package observable

import (
"sync"

"gopkg.in/eapache/channels.v1"
)

type Subscription <-chan interface{}

type Subscriber struct {
buffer chan interface{}
buffer *channels.InfiniteChannel
once sync.Once
}

func (s *Subscriber) Emit(item interface{}) {
s.buffer <- item
s.buffer.In() <- item
}

func (s *Subscriber) Out() Subscription {
return s.buffer
return s.buffer.Out()
}

func (s *Subscriber) Close() {
s.once.Do(func() {
close(s.buffer)
s.buffer.Close()
})
}

func newSubscriber() *Subscriber {
sub := &Subscriber{
buffer: make(chan interface{}, 200),
buffer: channels.NewInfiniteChannel(),
}
return sub
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.15

require (
github.com/Dreamacro/go-shadowsocks2 v0.1.6
github.com/eapache/queue v1.1.0 // indirect
github.com/go-chi/chi v4.1.2+incompatible
github.com/go-chi/cors v1.1.1
github.com/go-chi/render v1.0.1
Expand All @@ -18,6 +19,7 @@ require (
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
gopkg.in/eapache/channels.v1 v1.1.0
gopkg.in/yaml.v2 v2.3.0
)

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSi
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec=
github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-chi/cors v1.1.1 h1:eHuqxsIw89iXcWnWUN8R72JMibABJTN/4IOYI5WERvw=
Expand Down Expand Up @@ -66,6 +68,8 @@ golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapK
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/eapache/channels.v1 v1.1.0 h1:5bGAyKKvyCTWjSj7mhefG6Lc68VyN4MH1v8/7OoeeB4=
gopkg.in/eapache/channels.v1 v1.1.0/go.mod h1:BHIBujSvu9yMTrTYbTCjDD43gUhtmaOtTWDe7sTv1js=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
20 changes: 12 additions & 8 deletions tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
"github.com/Dreamacro/clash/component/resolver"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"

channels "gopkg.in/eapache/channels.v1"
)

var (
tcpQueue = make(chan C.ServerAdapter, 200)
udpQueue = make(chan *inbound.PacketAdapter, 200)
tcpQueue = channels.NewInfiniteChannel()
udpQueue = channels.NewInfiniteChannel()
natTable = nat.New()
rules []C.Rule
proxies = make(map[string]C.Proxy)
Expand All @@ -37,13 +39,13 @@ func init() {

// Add request to queue
func Add(req C.ServerAdapter) {
tcpQueue <- req
tcpQueue.In() <- req
}

// AddPacket add udp Packet to queue
func AddPacket(packet *inbound.PacketAdapter) {
select {
case udpQueue <- packet:
case udpQueue.In() <- packet:
default:
}
}
Expand Down Expand Up @@ -90,8 +92,9 @@ func SetMode(m TunnelMode) {

// processUDP starts a loop to handle udp packet
func processUDP() {
queue := udpQueue
for conn := range queue {
queue := udpQueue.Out()
for elm := range queue {
conn := elm.(*inbound.PacketAdapter)
handleUDPConn(conn)
}
}
Expand All @@ -105,8 +108,9 @@ func process() {
go processUDP()
}

queue := tcpQueue
for conn := range queue {
queue := tcpQueue.Out()
for elm := range queue {
conn := elm.(C.ServerAdapter)
go handleTCPConn(conn)
}
}
Expand Down

0 comments on commit bce3f01

Please sign in to comment.