forked from acrosa/scala-redis
-
Notifications
You must be signed in to change notification settings - Fork 218
/
Copy pathIO.scala
125 lines (108 loc) · 2.87 KB
/
IO.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package com.redis
import java.io._
import java.net.{InetAddress, Socket, InetSocketAddress}
import serialization.Parse.parseStringSafe
trait IO extends Log {
val addr: NodeAddress
def host: String = addr.addr._1
def port: Int = addr.addr._2
addr onChange onAddrChange
@volatile var socket: Socket = _
var out: OutputStream = _
var in: InputStream = _
var db: Int = _
def connected = {
socket != null && socket.isBound() && !socket.isClosed() && socket.isConnected() && !socket.isInputShutdown() && !socket.isOutputShutdown();
}
def reconnect = {
disconnect && connect
}
protected def onAddrChange(addr: InetSocketAddress) {
val sock = socket
if (sock != null && sock.getRemoteSocketAddress != addr) {
sock.close() // just close the socket (pretend the server closed it)
}
}
// Connects the socket, and sets the input and output streams.
def connect: Boolean = {
try {
val (host, port) = addr.addr
socket = new Socket(host, port)
socket.setSoTimeout(0)
socket.setKeepAlive(true)
socket.setTcpNoDelay(true)
out = socket.getOutputStream
in = new BufferedInputStream(socket.getInputStream)
true
} catch {
case x: Throwable =>
clearFd
throw new RuntimeException(x)
}
}
// Disconnects the socket.
def disconnect: Boolean = {
try {
socket.close
out.close
in.close
clearFd
true
} catch {
case x: Throwable =>
false
}
}
def clearFd = {
socket = null
out = null
in = null
}
// Wrapper for the socket write operation.
def write_to_socket(data: Array[Byte])(op: OutputStream => Unit) = op(out)
// Writes data to a socket using the specified block.
def write(data: Array[Byte]) = {
ifDebug("C: " + parseStringSafe(data))
if (!connected) connect;
write_to_socket(data){ os =>
try {
os.write(data)
os.flush
} catch {
case x: Throwable => throw new RedisConnectionException("connection is closed. write error")
}
}
}
private val crlf = List(13,10)
def readLine: Array[Byte] = {
if(!connected) connect
var delimiter = crlf
var found: List[Int] = Nil
var build = new scala.collection.mutable.ArrayBuilder.ofByte
while (delimiter != Nil) {
val next = in.read
if (next < 0) return null
if (next == delimiter.head) {
found ::= delimiter.head
delimiter = delimiter.tail
} else {
if (found != Nil) {
delimiter = crlf
build ++= found.reverseMap(_.toByte)
found = Nil
}
build += next.toByte
}
}
build.result
}
def readCounted(count: Int): Array[Byte] = {
if(!connected) connect
val arr = new Array[Byte](count)
var cur = 0
while (cur < count) {
cur += in.read(arr, cur, count - cur)
}
arr
}
}