-
Notifications
You must be signed in to change notification settings - Fork 22
/
udp.go
222 lines (197 loc) · 7.87 KB
/
udp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/**
* Filename: udp.go
* Description: the PortForward udp layer implement.
* Author: knownsec404
* Time: 2020.09.23
*/
package main
import (
"errors"
"net"
"time"
)
// as UDP client Conn
type UDPDistribute struct {
Established bool
Conn *(net.UDPConn)
RAddr net.Addr
Cache chan []byte
}
/**********************************************************************
* @Function: NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) (*UDPDistribute)
* @Description: initialize UDPDistribute structure (as UDP client Conn)
* @Parameter: conn *(net.UDPConn), the udp connection object
* @Parameter: addr net.Addr, the udp client remote adddress
* @Return: *UDPDistribute, the new UDPDistribute structure pointer
**********************************************************************/
func NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) (*UDPDistribute) {
return &UDPDistribute{
Established: true,
Conn: conn,
RAddr: addr,
Cache: make(chan []byte, 16),
}
}
/**********************************************************************
* @Function: (this *UDPDistribute) Close() (error)
* @Description: set "Established" flag is false, the UDP service will
* cleaned up according to certain conditions.
* @Parameter: nil
* @Return: error, the error
**********************************************************************/
func (this *UDPDistribute) Close() (error) {
this.Established = false
return nil
}
/**********************************************************************
* @Function: (this *UDPDistribute) Read(b []byte) (n int, err error)
* @Description: read data from connection, due to the udp implementation of
* PortForward, read here will only produce a timeout error and closed error
* (compared to the normal net.Conn object)
* @Parameter: b []byte, the buffer for receive data
* @Return: (n int, err error), the length of the data read and error
**********************************************************************/
func (this *UDPDistribute) Read(b []byte) (n int, err error) {
if !this.Established {
return 0, errors.New("udp distrubute has closed")
}
select {
case <-time.After(16 * time.Second):
return 0, errors.New("udp distrubute read timeout")
case data := <-this.Cache:
n := len(data)
copy(b, data)
return n, nil
}
}
/**********************************************************************
* @Function: (this *UDPDistribute) Write(b []byte) (n int, err error)
* @Description: write data to connection by "WriteTo()"
* @Parameter: b []byte, the data to be sent
* @Return: (n int, err error), the length of the data write and error
**********************************************************************/
func (this *UDPDistribute) Write(b []byte) (n int, err error) {
if !this.Established {
return 0, errors.New("udp distrubute has closed")
}
return this.Conn.WriteTo(b, this.RAddr)
}
/**********************************************************************
* @Function: (this *UDPDistribute) RemoteAddr() (net.Addr)
* @Description: get remote address
* @Parameter: nil
* @Return: net.Addr, the remote address
**********************************************************************/
func (this *UDPDistribute) RemoteAddr() (net.Addr) {
return this.RAddr
}
/**********************************************************************
* @Function: ListenUDP(address string, clientc chan Conn, quit chan bool)
* @Description: listen local udp service, and accept client connection,
* initialize connection and return by channel.
* since udp is running as a service, it only obtains remote data through
* the Read* function cluster (different from tcp), so we need a temporary
* table to record, so that we can use the temporary table to determine
* whether to forward or create a new link
* @Parameter: address string, the local listen address
* @Parameter: clientc chan Conn, new client connection channel
* @Parameter: quit chan bool, the quit signal channel
* @Return: nil
**********************************************************************/
func ListenUDP(address string, clientc chan Conn, quit chan bool) {
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
LogError("udp listen error, %s", err)
clientc <- nil
return
}
serv, err := net.ListenUDP("udp", addr)
if err != nil {
LogError("udp listen error, %s", err)
clientc <- nil
return
}
defer serv.Close()
// the udp distrubute table
table := make(map[string]*UDPDistribute)
// NOTICE:
// in the process of running, the table will generate invalid historical
// data, we have not cleaned it up(These invalid historical data will not
// affect our normal operation).
//
// if you want to deal with invalid historical data, the best way is to
// launch a new coroutine to handle net.Read, and another coroutine to
// handle the connection status, but additional logic to exit the
// coroutine is needed. Currently we think it is unnecessary.
//
// under the current code logic: if a udp connection exits, it will timeout
// to trigger "Close()", and finally set "Established" to false, but there
// is no logic to check this state to clean up, so invalid historical data
// is generated (of course, we can traverse to clean up? every packet? no)
//
// PS: you can see that we called "delete()" in the following logic, but
// this is only for processing. when a new udp client hits invalid
// historical data, we need to return a new connection so that PortForward
// can create a communication link.
for {
// check quit
select {
case <-quit:
return
default:
}
// set timeout, for check "quit" signal
serv.SetDeadline(time.Now().Add(16 * time.Second))
// just new 32*1024, in the outer layer we used "io.Copy()", which
// can only handle the size of 32*1024
buf := make([]byte, 32 * 1024)
n, addr, err := serv.ReadFrom(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
continue
}
LogError("udp listen error, %s", err)
clientc <- nil
return
}
buf = buf[:n]
// if the address in table, we distrubute message
if d, ok := table[addr.String()]; ok {
if d.Established {
// it is established, distrubute message
d.Cache <- buf
continue
} else {
// we remove it when the connnection has expired
delete(table, addr.String())
}
}
// if the address not in table, we create new connection object
conn := NewUDPDistribute(serv, addr)
table[addr.String()] = conn
conn.Cache <- buf
clientc <- conn
} // end for
}
/**********************************************************************
* @Function: ConnUDP(address string) (Conn, error)
* @Description: dial to remote server, and return udp connection
* @Parameter: address string, the remote server address that needs to be dialed
* @Return: (Conn, error), the udp connection and error
**********************************************************************/
func ConnUDP(address string) (Conn, error) {
conn, err := net.DialTimeout("udp", address, 10 * time.Second)
if err != nil {
return nil, err
}
// send one byte(knock) to server, get "established" udp connection
_, err = conn.Write([]byte("\x00"))
if err != nil {
return nil, err
}
// due to the characteristics of udp, when the udp server exits, we will
// not receive any signal, it will be blocked at conn.Read();
// here we set a timeout for udp
conn.SetDeadline(time.Now().Add(60 * time.Second))
return conn, nil
}