Skip to content

Commit f49c40d

Browse files
committed
Merge branch 'master' into sbruens/ip-key-metrics
2 parents db15a2f + b9cb68e commit f49c40d

File tree

2 files changed

+74
-32
lines changed

2 files changed

+74
-32
lines changed

service/metrics/metrics.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,13 @@ func (c *measuredConn) Write(b []byte) (int, error) {
5353
return n, err
5454
}
5555

56-
func (c *measuredConn) ReadFrom(r io.Reader) (int64, error) {
57-
n, err := io.Copy(c.StreamConn, r)
56+
func (c *measuredConn) ReadFrom(r io.Reader) (n int64, err error) {
57+
if rf, ok := c.StreamConn.(io.ReaderFrom); ok {
58+
// Prefer ReadFrom if we are calling ReadFrom. Otherwise io.Copy will try WriteTo first.
59+
n, err = rf.ReadFrom(r)
60+
} else {
61+
n, err = io.Copy(c.StreamConn, r)
62+
}
5863
*c.writeCount += n
5964
return n, err
6065
}

service/tcp.go

+67-30
Original file line numberDiff line numberDiff line change
@@ -235,25 +235,22 @@ func (h *tcpHandler) Handle(ctx context.Context, clientConn transport.StreamConn
235235
logger.Debugf("Done with status %v, duration %v", status, connDuration)
236236
}
237237

238-
func (h *tcpHandler) handleConnection(ctx context.Context, listenerPort int, clientConn transport.StreamConn, proxyMetrics *metrics.ProxyMetrics) (string, *onet.ConnectionError) {
239-
// Set a deadline to receive the address to the target.
240-
clientConn.SetReadDeadline(time.Now().Add(h.readTimeout))
241-
242-
// 1. Find the cipher and acess key id.
238+
func (h *tcpHandler) authenticate(clientConn transport.StreamConn, proxyMetrics *metrics.ProxyMetrics) (string, transport.StreamConn, *onet.ConnectionError) {
239+
// TODO(fortuna): Offer alternative transports.
240+
// Find the cipher and acess key id.
243241
cipherEntry, clientReader, clientSalt, timeToCipher, keyErr := findAccessKey(clientConn, remoteIP(clientConn), h.ciphers)
244242
h.m.AddTCPCipherSearch(keyErr == nil, timeToCipher)
245243
if keyErr != nil {
246244
logger.Debugf("Failed to find a valid cipher after reading %v bytes: %v", proxyMetrics.ClientProxy, keyErr)
247245
const status = "ERR_CIPHER"
248-
h.absorbProbe(listenerPort, clientConn, status, proxyMetrics)
249-
return "", onet.NewConnectionError(status, "Failed to find a valid cipher", keyErr)
246+
return "", nil, onet.NewConnectionError(status, "Failed to find a valid cipher", keyErr)
250247
}
251248
var id string
252249
if cipherEntry != nil {
253250
id = cipherEntry.ID
254251
}
255252

256-
// 2. Check if the connection is a replay.
253+
// Check if the connection is a replay.
257254
isServerSalt := cipherEntry.SaltGenerator.IsServerSalt(clientSalt)
258255
// Only check the cache if findAccessKey succeeded and the salt is unrecognized.
259256
if isServerSalt || !h.replayCache.Add(cipherEntry.ID, clientSalt) {
@@ -263,40 +260,41 @@ func (h *tcpHandler) handleConnection(ctx context.Context, listenerPort int, cli
263260
} else {
264261
status = "ERR_REPLAY_CLIENT"
265262
}
266-
h.absorbProbe(listenerPort, clientConn, status, proxyMetrics)
267263
logger.Debugf(status+": %v sent %d bytes", clientConn.RemoteAddr(), proxyMetrics.ClientProxy)
268-
return id, onet.NewConnectionError(status, "Replay detected", nil)
264+
return id, nil, onet.NewConnectionError(status, "Replay detected", nil)
269265
}
270266

271267
h.m.AddAuthenticatedTCPConnection(clientConn.RemoteAddr(), id)
272-
273-
// 3. Read target address and dial it.
274268
ssr := shadowsocks.NewReader(clientReader, cipherEntry.CryptoKey)
275-
tgtAddr, err := socks.ReadAddr(ssr)
269+
ssw := shadowsocks.NewWriter(clientConn, cipherEntry.CryptoKey)
270+
ssw.SetSaltGenerator(cipherEntry.SaltGenerator)
271+
return id, transport.WrapConn(clientConn, ssr, ssw), nil
272+
}
276273

277-
// Clear the deadline for the target address
278-
clientConn.SetReadDeadline(time.Time{})
274+
func getProxyRequest(clientConn transport.StreamConn) (string, error) {
275+
// TODO(fortuna): Use Shadowsocks proxy, HTTP CONNECT or SOCKS5 based on first byte:
276+
// case 1, 3 or 4: Shadowsocks (address type)
277+
// case 5: SOCKS5 (protocol version)
278+
// case "C": HTTP CONNECT (first char of method)
279+
tgtAddr, err := socks.ReadAddr(clientConn)
279280
if err != nil {
280-
// Drain to prevent a close on cipher error.
281-
io.Copy(io.Discard, clientConn)
282-
return id, onet.NewConnectionError("ERR_READ_ADDRESS", "Failed to get target address", err)
281+
return "", err
283282
}
284-
tgtConn, dialErr := h.dialer.DialStream(ctx, tgtAddr.String())
283+
return tgtAddr.String(), nil
284+
}
285+
286+
func proxyConnection(ctx context.Context, dialer transport.StreamDialer, tgtAddr string, clientConn transport.StreamConn) *onet.ConnectionError {
287+
tgtConn, dialErr := dialer.DialStream(ctx, tgtAddr)
285288
if dialErr != nil {
286289
// We don't drain so dial errors and invalid addresses are communicated quickly.
287-
return id, ensureConnectionError(dialErr, "ERR_CONNECT", "Failed to connect to target")
290+
return ensureConnectionError(dialErr, "ERR_CONNECT", "Failed to connect to target")
288291
}
289-
tgtConn = metrics.MeasureConn(tgtConn, &proxyMetrics.ProxyTarget, &proxyMetrics.TargetProxy)
290292
defer tgtConn.Close()
291-
292-
// 4. Bridge the client and target connections
293293
logger.Debugf("proxy %s <-> %s", clientConn.RemoteAddr().String(), tgtConn.RemoteAddr().String())
294-
ssw := shadowsocks.NewWriter(clientConn, cipherEntry.CryptoKey)
295-
ssw.SetSaltGenerator(cipherEntry.SaltGenerator)
296294

297295
fromClientErrCh := make(chan error)
298296
go func() {
299-
_, fromClientErr := ssr.WriteTo(tgtConn)
297+
_, fromClientErr := io.Copy(tgtConn, clientConn)
300298
if fromClientErr != nil {
301299
// Drain to prevent a close in the case of a cipher error.
302300
io.Copy(io.Discard, clientConn)
@@ -308,19 +306,58 @@ func (h *tcpHandler) handleConnection(ctx context.Context, listenerPort int, cli
308306
tgtConn.CloseWrite()
309307
fromClientErrCh <- fromClientErr
310308
}()
311-
_, fromTargetErr := ssw.ReadFrom(tgtConn)
309+
_, fromTargetErr := io.Copy(clientConn, tgtConn)
312310
// Send FIN to client.
313311
clientConn.CloseWrite()
314312
tgtConn.CloseRead()
315313

316314
fromClientErr := <-fromClientErrCh
317315
if fromClientErr != nil {
318-
return id, onet.NewConnectionError("ERR_RELAY_CLIENT", "Failed to relay traffic from client", fromClientErr)
316+
return onet.NewConnectionError("ERR_RELAY_CLIENT", "Failed to relay traffic from client", fromClientErr)
319317
}
320318
if fromTargetErr != nil {
321-
return id, onet.NewConnectionError("ERR_RELAY_TARGET", "Failed to relay traffic from target", fromTargetErr)
319+
return onet.NewConnectionError("ERR_RELAY_TARGET", "Failed to relay traffic from target", fromTargetErr)
322320
}
323-
return id, nil
321+
return nil
322+
}
323+
324+
func (h *tcpHandler) handleConnection(ctx context.Context, listenerPort int, outerConn transport.StreamConn, proxyMetrics *metrics.ProxyMetrics) (string, *onet.ConnectionError) {
325+
// Set a deadline to receive the address to the target.
326+
readDeadline := time.Now().Add(h.readTimeout)
327+
if deadline, ok := ctx.Deadline(); ok {
328+
outerConn.SetDeadline(deadline)
329+
if deadline.Before(readDeadline) {
330+
readDeadline = deadline
331+
}
332+
}
333+
outerConn.SetReadDeadline(readDeadline)
334+
335+
id, innerConn, authErr := h.authenticate(outerConn, proxyMetrics)
336+
if authErr != nil {
337+
// Drain to protect against probing attacks.
338+
h.absorbProbe(listenerPort, outerConn, authErr.Status, proxyMetrics)
339+
return id, authErr
340+
}
341+
342+
// Read target address and dial it.
343+
tgtAddr, err := getProxyRequest(innerConn)
344+
// Clear the deadline for the target address
345+
outerConn.SetReadDeadline(time.Time{})
346+
if err != nil {
347+
// Drain to prevent a close on cipher error.
348+
io.Copy(io.Discard, outerConn)
349+
return id, onet.NewConnectionError("ERR_READ_ADDRESS", "Failed to get target address", err)
350+
}
351+
352+
dialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) {
353+
tgtConn, err := h.dialer.DialStream(ctx, tgtAddr)
354+
if err != nil {
355+
return nil, err
356+
}
357+
tgtConn = metrics.MeasureConn(tgtConn, &proxyMetrics.ProxyTarget, &proxyMetrics.TargetProxy)
358+
return tgtConn, nil
359+
})
360+
return id, proxyConnection(ctx, dialer, tgtAddr, innerConn)
324361
}
325362

326363
// Keep the connection open until we hit the authentication deadline to protect against probing attacks

0 commit comments

Comments
 (0)