From 0700676c034d1bd0e0487bed2f40f86a5962dca2 Mon Sep 17 00:00:00 2001 From: Jack Zhou Date: Mon, 24 Jun 2013 19:40:03 -0700 Subject: [PATCH] Added support for Redis Sentinel. --- src/main/scala/com/redis/IO.scala | 19 ++- src/main/scala/com/redis/NodeAddress.scala | 100 ++++++++++++++ src/main/scala/com/redis/Pool.scala | 18 ++- src/main/scala/com/redis/RedisClient.scala | 6 +- src/main/scala/com/redis/RedisProtocol.scala | 15 +++ src/main/scala/com/redis/SentinelClient.scala | 15 +++ .../scala/com/redis/SentinelOperations.scala | 32 +++++ .../com/redis/cluster/RedisCluster.scala | 3 +- .../scala/com/redis/cluster/RedisShards.scala | 3 +- src/main/scala/com/redis/ds/Deque.scala | 8 +- .../com/redis/SentinelOperationsSpec.scala | 127 ++++++++++++++++++ 11 files changed, 325 insertions(+), 21 deletions(-) create mode 100644 src/main/scala/com/redis/NodeAddress.scala create mode 100644 src/main/scala/com/redis/SentinelClient.scala create mode 100644 src/main/scala/com/redis/SentinelOperations.scala create mode 100644 src/test/scala/com/redis/SentinelOperationsSpec.scala diff --git a/src/main/scala/com/redis/IO.scala b/src/main/scala/com/redis/IO.scala index c98d6758..efb37a38 100644 --- a/src/main/scala/com/redis/IO.scala +++ b/src/main/scala/com/redis/IO.scala @@ -1,15 +1,18 @@ package com.redis import java.io._ -import java.net.{Socket, InetSocketAddress} +import java.net.{InetAddress, Socket, InetSocketAddress} import serialization.Parse.parseStringSafe trait IO extends Log { - val host: String - val port: Int + val addr: NodeAddress + def host: String = addr.addr._1 + def port: Int = addr.addr._2 - var socket: Socket = _ + addr onChange onAddrChange + + @volatile var socket: Socket = _ var out: OutputStream = _ var in: InputStream = _ var db: Int = _ @@ -21,9 +24,17 @@ trait IO extends Log { disconnect && connect } + protected def onAddrChange(addr: InetSocketAddress) { + val sock = socket + if (sock != null && sock.getRemoteSocketAddress != addr) { + sock.close() // just close the socket (pretend the server closed it) + } + } + // Connects the socket, and sets the input and output streams. def connect: Boolean = { try { + val (host, port) = addr.addr socket = new Socket(host, port) socket.setSoTimeout(0) diff --git a/src/main/scala/com/redis/NodeAddress.scala b/src/main/scala/com/redis/NodeAddress.scala new file mode 100644 index 00000000..b9e88ba3 --- /dev/null +++ b/src/main/scala/com/redis/NodeAddress.scala @@ -0,0 +1,100 @@ +package com.redis + +import java.net.InetSocketAddress + +abstract class NodeAddress { + def addr: (String, Int) + + def onChange(callback: InetSocketAddress => Unit) + override def toString = { + val (host, port) = addr + host + ":" + String.valueOf(port) + } +} + +class FixedAddress(host: String, port: Int) extends NodeAddress { + val addr = (host, port) + override def onChange(callback: InetSocketAddress => Unit) { } +} + +class SentinelMonitoredMasterAddress(val sentinels: Seq[(String, Int)], val masterName: String) extends NodeAddress + with Log { + + var master: Option[(String, Int)] = None + + override def addr = master.synchronized { + master match { + case Some((h, p)) => (h, p) + case _ => throw new RuntimeException("All sentinels are down.") + } + } + + private var onChangeCallbacks: List[InetSocketAddress => Unit] = Nil + + override def onChange(callback: InetSocketAddress => Unit) = synchronized { + onChangeCallbacks = callback :: onChangeCallbacks + } + + private def fireCallbacks(addr: InetSocketAddress) = synchronized { + onChangeCallbacks foreach (_(addr)) + } + + def stopMonitoring() { + sentinelListeners foreach (_.stop()) + } + + private val sentinelClients = sentinels.map { case (h, p) => + val client = new SentinelClient(h, p) + master match { // this can be done without synchronization because the threads are not yet live + case Some(_) => + case None => + try { + master = client.getMasterAddrByName(masterName) + } catch { + case e: Throwable => error("Error connecting to sentinel.", e) + } + } + client + } + private val sentinelListeners = sentinelClients map { client => + val listener = new SentinelListener(client) + new Thread(listener).start() + listener + } + + private class SentinelListener(val client: SentinelClient) extends Runnable { + @volatile var running: Boolean = false + + def run() { + running = true + while (running) { + try { + client.synchronized { + client.send("SUBSCRIBE", List("+switch-master"))(()) + } + new client.Consumer((msg: PubSubMessage) => + msg match { + case M(chan, msgText) => + val tokens = msgText split ' ' + val addr = tokens(3) + val port = tokens(4).toInt + master.synchronized { + master = Some(addr, port) + } + fireCallbacks(new InetSocketAddress(addr, port)) + case _ => + }).run() // synchronously read, so we know when a disconnect happens + } catch { + case e: Throwable => error("Error connecting to sentinel.", e) + } + } + } + + def stop() { + client.synchronized { + client.unsubscribe + } + running = false + } + } +} diff --git a/src/main/scala/com/redis/Pool.scala b/src/main/scala/com/redis/Pool.scala index 7dae70b4..b5459427 100644 --- a/src/main/scala/com/redis/Pool.scala +++ b/src/main/scala/com/redis/Pool.scala @@ -4,12 +4,12 @@ import org.apache.commons.pool._ import org.apache.commons.pool.impl._ import com.redis.cluster.ClusterNode -private [redis] class RedisClientFactory(val host: String, val port: Int, val database: Int = 0, val secret: Option[Any] = None) +private [redis] class RedisClientFactory(val addr: NodeAddress, val database: Int = 0, val secret: Option[Any] = None) extends PoolableObjectFactory[RedisClient] { // when we make an object it's already connected def makeObject = { - val cl = new RedisClient(host, port) + val cl = new RedisClient(addr) if (database != 0) cl.select(database) secret.foreach(cl auth _) @@ -30,9 +30,15 @@ private [redis] class RedisClientFactory(val host: String, val port: Int, val da def activateObject(rc: RedisClient): Unit = {} } -class RedisClientPool(val host: String, val port: Int, val maxIdle: Int = 8, val database: Int = 0, val secret: Option[Any] = None) { - val pool = new StackObjectPool(new RedisClientFactory(host, port, database, secret), maxIdle) - override def toString = host + ":" + String.valueOf(port) +class RedisClientPool(val addr: NodeAddress, val maxIdle: Int = 8, val database: Int = 0, val secret: Option[Any] = None) { + def host: String = addr.addr._1 + def port: Int = addr.addr._2 + + def this(host: String, port: Int) = + this(new FixedAddress(host, port)) + + val pool = new StackObjectPool(new RedisClientFactory(addr, database, secret), maxIdle) + override def toString = addr.toString def withClient[T](body: RedisClient => T) = { val client = pool.borrowObject @@ -52,6 +58,6 @@ class RedisClientPool(val host: String, val port: Int, val maxIdle: Int = 8, val * @param poolname must be unique */ class IdentifiableRedisClientPool(val node: ClusterNode) - extends RedisClientPool (node.host, node.port, node.maxIdle, node.database, node.secret){ + extends RedisClientPool (new FixedAddress(node.host, node.port), node.maxIdle, node.database, node.secret) { override def toString = node.nodename } diff --git a/src/main/scala/com/redis/RedisClient.scala b/src/main/scala/com/redis/RedisClient.scala index 3be897aa..54790833 100644 --- a/src/main/scala/com/redis/RedisClient.scala +++ b/src/main/scala/com/redis/RedisClient.scala @@ -57,11 +57,12 @@ trait RedisCommand extends Redis with EvalOperations -class RedisClient(override val host: String, override val port: Int) +class RedisClient(val addr: NodeAddress) extends RedisCommand with PubSub { connect + def this(host: String, port: Int) = this(new FixedAddress(host, port)) def this() = this("localhost", 6379) override def toString = host + ":" + String.valueOf(port) @@ -153,8 +154,7 @@ class RedisClient(override val host: String, override val port: Int) null.asInstanceOf[A] } - val host = parent.host - val port = parent.port + lazy val addr = parent.addr // TODO: Find a better abstraction override def connected = parent.connected diff --git a/src/main/scala/com/redis/RedisProtocol.scala b/src/main/scala/com/redis/RedisProtocol.scala index a6350d2a..139a4bd0 100644 --- a/src/main/scala/com/redis/RedisProtocol.scala +++ b/src/main/scala/com/redis/RedisProtocol.scala @@ -40,6 +40,7 @@ private [redis] trait Reply { type Reply[T] = PartialFunction[(Char, Array[Byte]), T] type SingleReply = Reply[Option[Array[Byte]]] type MultiReply = Reply[Option[List[Option[Array[Byte]]]]] + type MultiMultiReply = Reply[Option[List[Option[List[Option[Array[Byte]]]]]]] def readLine: Array[Byte] def readCounted(c: Int): Array[Byte] @@ -80,6 +81,14 @@ private [redis] trait Reply { } } + val multiMultiBulkReply: MultiMultiReply = { + case (MULTI, str) => + Parsers.parseInt(str) match { + case -1 => None + case n => Some(List.fill(n)(receive(multiBulkReply))) + } + } + def execReply(handlers: Seq[() => Any]): PartialFunction[(Char, Array[Byte]), Option[List[Any]]] = { case (MULTI, str) => Parsers.parseInt(str) match { @@ -146,6 +155,12 @@ private [redis] trait R extends Reply { case _ => Iterator.single(None) }.toList) + def asListOfListPairs[A,B](implicit parseA: Parse[A], parseB: Parse[B]): Option[List[Option[List[Option[(A,B)]]]]] = + receive(multiMultiBulkReply).map(_.map(_.map(_.grouped(2).map { + case List(Some(a), Some(b)) => Some((parseA(a), parseB(b))) + case _ => None + }.toList))) + def asQueuedList: Option[List[Option[String]]] = receive(queuedReplyList).map(_.map(_.map(Parsers.parseString))) def asExec(handlers: Seq[() => Any]): Option[List[Any]] = receive(execReply(handlers)) diff --git a/src/main/scala/com/redis/SentinelClient.scala b/src/main/scala/com/redis/SentinelClient.scala new file mode 100644 index 00000000..da6a1171 --- /dev/null +++ b/src/main/scala/com/redis/SentinelClient.scala @@ -0,0 +1,15 @@ +package com.redis + +class SentinelClient(override val host: String, override val port: Int) extends Redis + with SentinelOperations + with PubSub { + + lazy val addr: NodeAddress = new FixedAddress(host, port) // can only be fixed, not a dynamic master + + def this() = this("localhost", 26379) + override def toString = host + ":" + String.valueOf(port) + + // publishing is not allowed on a sentinel's pub/sub channel + override def publish(channel: String, msg: String): Option[Long] = + throw new RuntimeException("Publishing is not supported on a sentinel.") +} diff --git a/src/main/scala/com/redis/SentinelOperations.scala b/src/main/scala/com/redis/SentinelOperations.scala new file mode 100644 index 00000000..f36c1946 --- /dev/null +++ b/src/main/scala/com/redis/SentinelOperations.scala @@ -0,0 +1,32 @@ +package com.redis + +import serialization._ + +trait SentinelOperations { self: Redis => + + def masters[K,V](implicit format: Format, parseK: Parse[K], parseV: Parse[V]): Option[List[Option[Map[K,V]]]] = + send("SENTINEL", List("MASTERS"))(asListOfListPairs[K,V].map(_.map(_.map(_.flatten.toMap)))) + + def slaves[K,V](name: String)(implicit format: Format, parseK: Parse[K], parseV: Parse[V]): + Option[List[Option[Map[K,V]]]] = + send("SENTINEL", List("SLAVES", name))(asListOfListPairs[K,V].map(_.map(_.map(_.flatten.toMap)))) + + def isMasterDownByAddr(host: String, port: Int): Option[(Boolean, String)] = + send("SENTINEL", List("IS-MASTER-DOWN-BY-ADDR", host, port))(asList) match { + case Some(List(Some(down), Some(leader))) => Some(down.toInt == 1, leader) + case _ => None + } + + def getMasterAddrByName(name: String): Option[(String, Int)] = + send("SENTINEL", List("GET-MASTER-ADDR-BY-NAME", name))(asList[String]) match { + case Some(List(Some(h), Some(p))) => Some(h, p.toInt) + case _ => None + } + + def reset(pattern: String): Option[Int] = + send("SENTINEL", List("RESET", pattern))(asInt) + + def failover(name: String): Boolean = + send("SENTINEL", List("FAILOVER", name))(asBoolean) + +} diff --git a/src/main/scala/com/redis/cluster/RedisCluster.scala b/src/main/scala/com/redis/cluster/RedisCluster.scala index 20c548aa..712629f8 100644 --- a/src/main/scala/com/redis/cluster/RedisCluster.scala +++ b/src/main/scala/com/redis/cluster/RedisCluster.scala @@ -67,8 +67,7 @@ case class ClusterNode(nodename: String, host: String, port: Int, database: Int abstract class RedisCluster(hosts: ClusterNode*) extends RedisCommand { // not needed at cluster level - override val host = null - override val port = 0 + lazy val addr = new FixedAddress(null, 0) // abstract val val keyTag: Option[KeyTag] diff --git a/src/main/scala/com/redis/cluster/RedisShards.scala b/src/main/scala/com/redis/cluster/RedisShards.scala index b5d5215b..1fca84ad 100644 --- a/src/main/scala/com/redis/cluster/RedisShards.scala +++ b/src/main/scala/com/redis/cluster/RedisShards.scala @@ -14,8 +14,7 @@ import scala.util.matching.Regex abstract class RedisShards(val hosts: List[ClusterNode]) extends RedisCommand { // not needed at cluster level - override val host = null - override val port = 0 + lazy val addr = new FixedAddress(null, 0) // abstract val val keyTag: Option[KeyTag] diff --git a/src/main/scala/com/redis/ds/Deque.scala b/src/main/scala/com/redis/ds/Deque.scala index 8d0b6e7a..72bc36ea 100644 --- a/src/main/scala/com/redis/ds/Deque.scala +++ b/src/main/scala/com/redis/ds/Deque.scala @@ -73,13 +73,13 @@ abstract class RedisDeque[A](val blocking: Boolean = false, val timeoutInSecs: I } } -import com.redis.{Redis, ListOperations} +import com.redis.{Redis, ListOperations, NodeAddress, FixedAddress} -class RedisDequeClient(val h: String, val p: Int) { +class RedisDequeClient(val a: NodeAddress) { + def this(h: String, p: Int) = this(new FixedAddress(h, p)) def getDeque[A](k: String, blocking: Boolean = false, timeoutInSecs: Int = 0)(implicit format: Format, parse: Parse[A]) = new RedisDeque(blocking, timeoutInSecs)(format, parse) with ListOperations with Redis { - val host = h - val port = p + lazy val addr = a val key = k connect } diff --git a/src/test/scala/com/redis/SentinelOperationsSpec.scala b/src/test/scala/com/redis/SentinelOperationsSpec.scala new file mode 100644 index 00000000..fdb37aed --- /dev/null +++ b/src/test/scala/com/redis/SentinelOperationsSpec.scala @@ -0,0 +1,127 @@ +package com.redis + +import org.scalatest.{OptionValues, BeforeAndAfterAll, BeforeAndAfterEach, FunSpec} +import org.scalatest.matchers.ShouldMatchers +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class SentinelOperationsSpec extends FunSpec + with ShouldMatchers + with BeforeAndAfterEach + with BeforeAndAfterAll + with OptionValues { + + val hosts = List(("localhost", 6379), ("localhost", 6380), ("localhost", 6381), ("localhost", 6382)) + val sentinels = List(("localhost", 26379), ("localhost", 26380), ("localhost", 26381), ("localhost", 26382)) + val hostClients = hosts map Function.tupled(new RedisClient(_, _)) + val sentinelClients = sentinels map Function.tupled(new SentinelClient(_, _)) + + override def beforeAll() { + hostClients.foreach { client => + if (client.port != 6379) { + client.slaveof("localhost", 6379) + } + } + + Thread sleep 10000 // wait for all the syncs to complete + } + + override def afterAll() { + hostClients.foreach (_.slaveof()) + + Thread sleep 10000 + } + + describe("masters") { + it("should return all masters") { + sentinelClients.foreach { client => + val masters = client.masters match { + case Some(m) => m.collect { + case Some(details) => (details("name"), details("ip"), details("port")) + } + case _ => Nil + } + + masters should equal (List(("scala-redis-test", "127.0.0.1", "6379"))) + } + } + } + + describe("slaves") { + it("should return all slaves") { + sentinelClients.foreach { client => + val slaves = (client.slaves("scala-redis-test") match { + case Some(s) => s.collect { + case Some(details) => (details("ip"), details("port").toInt) + } + case _ => Nil + }).sorted + + slaves should equal (List(("127.0.0.1", 6380), ("127.0.0.1", 6381), ("127.0.0.1", 6382))) + } + } + } + + describe("is-master-down-by-addr") { + it("should report master is not down") { + sentinelClients.foreach { client => + client.isMasterDownByAddr("127.0.0.1", 6379).get._1 should equal (false) + } + } + } + + describe("get-master-addr-by-name") { + it("should return master address") { + sentinelClients.foreach { client => + client.getMasterAddrByName("scala-redis-test") should equal (Some(("127.0.0.1", 6379))) + } + } + } + + describe("reset") { + it("should reset one master") { + sentinelClients.foreach { client => + client.reset("scala-redis-*") should equal (Some(1)) + } + + Thread sleep 10000 // wait for sentinels to pick up slaves again + } + } + + describe("failover") { + it("should automatically update master ip in client") { + val masterAddr = new SentinelMonitoredMasterAddress(sentinels, "scala-redis-test") + val master = new RedisClient(masterAddr) + + val oldPort = master.port + + sentinelClients(0).failover("scala-redis-test") + Thread sleep 30000 + + master.port should not equal oldPort + + masterAddr.stopMonitoring() + } + + it("should automatically update master ip in pool") { + val masterAddr = new SentinelMonitoredMasterAddress(sentinels, "scala-redis-test") + val pool = new RedisClientPool(masterAddr) + + var oldPort = -1 + pool.withClient { client => + oldPort = client.port + } + oldPort should not equal -1 + + sentinelClients(0).failover("scala-redis-test") + Thread sleep 30000 + + pool.withClient { client => + client.port should not equal oldPort + } + + masterAddr.stopMonitoring() + } + } +}