forked from acrosa/scala-redis
-
Notifications
You must be signed in to change notification settings - Fork 218
/
Copy pathRedisClient.scala
169 lines (148 loc) · 5.09 KB
/
RedisClient.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package com.redis
import serialization.Format
import java.net.SocketException
object RedisClient {
trait SortOrder
case object ASC extends SortOrder
case object DESC extends SortOrder
trait Aggregate
case object SUM extends Aggregate
case object MIN extends Aggregate
case object MAX extends Aggregate
}
trait Redis extends IO with Protocol {
def send[A](command: String, args: Seq[Any])(result: => A)(implicit format: Format): A = try {
write(Commands.multiBulk(command.getBytes("UTF-8") +: (args map (format.apply))))
result
} catch {
case e: RedisConnectionException =>
if (reconnect) send(command, args)(result)
else throw e
case e: SocketException =>
if (reconnect) send(command, args)(result)
else throw e
}
def send[A](command: String)(result: => A): A = try {
write(Commands.multiBulk(List(command.getBytes("UTF-8"))))
result
} catch {
case e: RedisConnectionException =>
if (reconnect) send(command)(result)
else throw e
case e: SocketException =>
if (reconnect) send(command)(result)
else throw e
}
def cmd(args: Seq[Array[Byte]]) = Commands.multiBulk(args)
protected def flattenPairs(in: Iterable[Product2[Any, Any]]): List[Any] =
in.iterator.flatMap(x => Iterator(x._1, x._2)).toList
}
trait RedisCommand extends Redis
with Operations
with NodeOperations
with StringOperations
with ListOperations
with SetOperations
with SortedSetOperations
with HashOperations
with EvalOperations
class RedisClient(val addr: NodeAddress)
extends RedisCommand with PubSub {
connect
def this(host: String, port: Int) = this(new FixedAddress(host, port))
def this() = this("localhost", 6379)
override def toString = host + ":" + String.valueOf(port)
def pipeline(f: PipelineClient => Any): Option[List[Any]] = {
send("MULTI")(asString) // flush reply stream
try {
val pipelineClient = new PipelineClient(this)
try {
f(pipelineClient)
} catch {
case e: Exception =>
send("DISCARD")(asString)
throw e
}
send("EXEC")(asExec(pipelineClient.handlers))
} catch {
case e: RedisMultiExecException =>
None
}
}
import serialization.Parse
import scala.concurrent.{Promise, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Try
/**
* Redis pipelining API without the transaction semantics. The implementation has a non-blocking
* semantics and returns a <tt>List</tt> of <tt>Promise</tt>. The caller may use <tt>Future.firstCompletedOf</tt> to get the
* first completed task before all tasks have been completed.
* If an exception is raised in executing any of the commands, then the corresponding <tt>Promise</tt> holds
* the exception. Here's a sample usage:
* <pre>
* val x =
* r.pipelineNoMulti(
* List(
* {() => r.set("key", "debasish")},
* {() => r.get("key")},
* {() => r.get("key1")},
* {() => r.lpush("list", "maulindu")},
* {() => r.lpush("key", "maulindu")} // should raise an exception
* )
* )
* </pre>
*
* This queues up all commands and does pipelining. The returned r is a <tt>List</tt> of <tt>Promise</tt>. The client
* may want to wait for all to complete using:
*
* <pre>
* val result = x.map{a => Await.result(a.future, timeout)}
* </pre>
*
* Or the client may wish to track and get the promises as soon as the underlying <tt>Future</tt> is completed.
*/
def pipelineNoMulti(commands: Seq[() => Any]) = {
val ps = List.fill(commands.size)(Promise[Any]())
var i = -1
val f = Future {
commands.map {command =>
i = i + 1
Try {
command()
} recover {
case ex: java.lang.Exception =>
ps(i) success ex
} foreach {r =>
ps(i) success r
}
}
}
ps
}
class PipelineClient(parent: RedisClient) extends RedisCommand {
import serialization.Parse
var handlers: Vector[() => Any] = Vector.empty
override def send[A](command: String, args: Seq[Any])(result: => A)(implicit format: Format): A = {
write(Commands.multiBulk(command.getBytes("UTF-8") +: (args map (format.apply))))
handlers :+= (() => result)
receive(singleLineReply).map(Parse.parseDefault)
null.asInstanceOf[A] // ugh... gotta find a better way
}
override def send[A](command: String)(result: => A): A = {
write(Commands.multiBulk(List(command.getBytes("UTF-8"))))
handlers :+= (() => result)
receive(singleLineReply).map(Parse.parseDefault)
null.asInstanceOf[A]
}
lazy val addr = parent.addr
// TODO: Find a better abstraction
override def connected = parent.connected
override def connect = parent.connect
override def reconnect = parent.reconnect
override def disconnect = parent.disconnect
override def clearFd = parent.clearFd
override def write(data: Array[Byte]) = parent.write(data)
override def readLine = parent.readLine
override def readCounted(count: Int) = parent.readCounted(count)
}
}