Skip to content
This repository has been archived by the owner on Feb 9, 2019. It is now read-only.

Add Redis Sentinel support through Provider for JedisSentinelPool #172

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
6 changes: 4 additions & 2 deletions redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ val o = play.api.cache.Cache.getAs[String]("mykey")

#### Configurable

* Point to your Redis server using configuration settings ```redis.host```, ```redis.port```, ```redis.password``` and ```redis.database``` (defaults: ```localhost```, ```6379```, ```null``` and ```0```)
* Point to your Redis server using configuration settings ```redis.host```, ```redis.port```, ```redis.password``` and ```redis.database``` (defaults: ```localhost```, ```6379```, ```null``` and ```0```).
* Alternatively, specify a URI-based configuration using ```redis.uri``` (for example: ```redis.uri="redis://user:password@localhost:6379"```).
* Configure your Sentinels using ```redis.master.name``` and ```redis.sentinel.hosts``` (defaults: ```mymaster```, ```localhost:26379```).
* If ```redis.<name>.sentinel.mode``` is true (default: ```false```), then the ```<name>``` named cached will always contact the master node. Otherwise, the ```redis.host```, ```redis.port```, or ```redis.uri``` settings will be used.
* Set the timeout in milliseconds using ```redis.timeout``` (default is 2000).
* Configure any aspect of the connection pool. See [the documentation for commons-pool2 ```GenericObjectPoolConfig```](https://commons.apache.org/proper/commons-pool/apidocs/org/apache/commons/pool2/impl/GenericObjectPoolConfig.html), the underlying pool implementation, for more information on each setting.
* redis.pool.maxIdle
Expand Down Expand Up @@ -68,7 +70,7 @@ pool.withJedisClient { client =>
}
```
play = 2.4.x and 2.5.x:
Because the underlying Sedis Pool was injected for the cache module to use, you can just inject the sedis Pool yourself, something like this:
Because the underlying Sedis and Sentinel Pool was injected for the cache module to use, you can just inject the Pool yourself, something like this:

```scala
//scala
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.typesafe.play.redis

import PoolConfig.createPoolConfig

import java.net.URI
import javax.inject.{Provider, Inject, Singleton}

import org.apache.commons.lang3.builder.ReflectionToStringBuilder
import play.api.inject.ApplicationLifecycle
import play.api.{Logger, Configuration}
import redis.clients.jedis.{JedisPool, JedisPoolConfig}
import redis.clients.jedis.JedisPool

import scala.concurrent.Future

Expand Down Expand Up @@ -53,22 +55,4 @@ class JedisPoolProvider @Inject()(config: Configuration, lifecycle: ApplicationL

jedisPool
}

private def createPoolConfig(config: Configuration): JedisPoolConfig = {
val poolConfig: JedisPoolConfig = new JedisPoolConfig()
config.getInt("redis.pool.maxIdle").foreach(poolConfig.setMaxIdle)
config.getInt("redis.pool.minIdle").foreach(poolConfig.setMinIdle)
config.getInt("redis.pool.maxTotal").foreach(poolConfig.setMaxTotal)
config.getLong("redis.pool.maxWaitMillis").foreach(poolConfig.setMaxWaitMillis)
config.getBoolean("redis.pool.testOnBorrow").foreach(poolConfig.setTestOnBorrow)
config.getBoolean("redis.pool.testOnReturn").foreach(poolConfig.setTestOnReturn)
config.getBoolean("redis.pool.testWhileIdle").foreach(poolConfig.setTestWhileIdle)
config.getLong("redis.pool.timeBetweenEvictionRunsMillis").foreach(poolConfig.setTimeBetweenEvictionRunsMillis)
config.getInt("redis.pool.numTestsPerEvictionRun").foreach(poolConfig.setNumTestsPerEvictionRun)
config.getLong("redis.pool.minEvictableIdleTimeMillis").foreach(poolConfig.setMinEvictableIdleTimeMillis)
config.getLong("redis.pool.softMinEvictableIdleTimeMillis").foreach(poolConfig.setSoftMinEvictableIdleTimeMillis)
config.getBoolean("redis.pool.lifo").foreach(poolConfig.setLifo)
config.getBoolean("redis.pool.blockWhenExhausted").foreach(poolConfig.setBlockWhenExhausted)
poolConfig
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.typesafe.play.redis

import PoolConfig.createPoolConfig

import java.net.URI
import javax.inject.{Inject, Provider, Singleton}

import collection.JavaConverters._
import org.apache.commons.lang3.builder.ReflectionToStringBuilder
import play.api.{Configuration, Logger}
import play.api.inject.ApplicationLifecycle
import redis.clients.jedis.JedisSentinelPool

import scala.concurrent.Future

@Singleton
class JedisSentinelPoolProvider @Inject()(config: Configuration, lifecycle: ApplicationLifecycle) extends Provider[JedisSentinelPool] {

lazy val logger = Logger("redis.module")
lazy val get: JedisSentinelPool = {
val jedisSentinelPool = {
val redisUri = config.getString("redis.uri").map(new URI(_))

val masterName = config.getString("redis.master.name").getOrElse("mymaster")

val sentinelHosts = config.getStringList("redis.sentinel.hosts").getOrElse(Seq("localhost:26379").asJava)

val sentinelSet = new java.util.HashSet[String]()
sentinelSet.addAll(sentinelHosts)

val password = config.getString("redis.password")
.orElse(redisUri.map(_.getUserInfo).filter(_ != null).filter(_ contains ":").map(_.split(":", 2)(1)))
.orNull

val timeout = config.getInt("redis.timeout").getOrElse(2000)

val poolConfig = createPoolConfig(config)
Logger.info(s"Redis Plugin enabled. Monitoring Redis master $masterName with Sentinels $sentinelSet and timeout $timeout.")
Logger.info("Redis Plugin pool configuration: " + new ReflectionToStringBuilder(poolConfig).toString)

new JedisSentinelPool(masterName, sentinelSet, poolConfig, timeout, password)
}

logger.info("Starting Jedis Sentinel Pool Provider")

lifecycle.addStopHook(() => Future.successful {
logger.info("Stopping Jedis Sentinel Pool Provider")
jedisSentinelPool.destroy()
})

jedisSentinelPool
}
}
24 changes: 24 additions & 0 deletions redis/src/main/scala/com/typesafe/play/redis/PoolConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.typesafe.play.redis

import play.api.Configuration
import redis.clients.jedis.JedisPoolConfig

object PoolConfig {
def createPoolConfig(config: Configuration): JedisPoolConfig = {
val poolConfig: JedisPoolConfig = new JedisPoolConfig()
config.getInt("redis.pool.maxIdle").foreach(poolConfig.setMaxIdle)
config.getInt("redis.pool.minIdle").foreach(poolConfig.setMinIdle)
config.getInt("redis.pool.maxTotal").foreach(poolConfig.setMaxTotal)
config.getLong("redis.pool.maxWaitMillis").foreach(poolConfig.setMaxWaitMillis)
config.getBoolean("redis.pool.testOnBorrow").foreach(poolConfig.setTestOnBorrow)
config.getBoolean("redis.pool.testOnReturn").foreach(poolConfig.setTestOnReturn)
config.getBoolean("redis.pool.testWhileIdle").foreach(poolConfig.setTestWhileIdle)
config.getLong("redis.pool.timeBetweenEvictionRunsMillis").foreach(poolConfig.setTimeBetweenEvictionRunsMillis)
config.getInt("redis.pool.numTestsPerEvictionRun").foreach(poolConfig.setNumTestsPerEvictionRun)
config.getLong("redis.pool.minEvictableIdleTimeMillis").foreach(poolConfig.setMinEvictableIdleTimeMillis)
config.getLong("redis.pool.softMinEvictableIdleTimeMillis").foreach(poolConfig.setSoftMinEvictableIdleTimeMillis)
config.getBoolean("redis.pool.lifo").foreach(poolConfig.setLifo)
config.getBoolean("redis.pool.blockWhenExhausted").foreach(poolConfig.setBlockWhenExhausted)
poolConfig
}
}
20 changes: 16 additions & 4 deletions redis/src/main/scala/com/typesafe/play/redis/RedisModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package com.typesafe.play.redis

import javax.inject.{Inject, Provider}

import org.sedis.Pool
import org.sedis.{Pool, SentinelPool}
import play.api.cache.{CacheApi, Cached, NamedCache}
import play.api.inject._
import play.api.{Configuration, Environment}
import play.cache.{CacheApi => JavaCacheApi, DefaultCacheApi => DefaultJavaCacheApi, NamedCacheImpl}
import redis.clients.jedis.JedisPool
import play.cache.{NamedCacheImpl, CacheApi => JavaCacheApi, DefaultCacheApi => DefaultJavaCacheApi}
import redis.clients.jedis.{JedisPool, JedisSentinelPool}

/**
* Redis cache components for compile time injection
Expand Down Expand Up @@ -49,7 +49,10 @@ class RedisModule extends Module {
val namedCache = named(name)
val cacheApiKey = bind[CacheApi].qualifiedWith(namedCache)
Seq(
cacheApiKey.to(new NamedRedisCacheApiProvider(name, bind[Pool], environment.classLoader)),
if (configuration.getBoolean(s"redis.$name.sentinel.mode").getOrElse(false))
cacheApiKey.to(new NamedSentinelCacheApiProvider(name, bind[SentinelPool], environment.classLoader))
else
cacheApiKey.to(new NamedRedisCacheApiProvider(name, bind[Pool], environment.classLoader)),
bind[JavaCacheApi].qualifiedWith(namedCache).to(new NamedJavaCacheApiProvider(cacheApiKey)),
bind[Cached].qualifiedWith(namedCache).to(new NamedCachedProvider(cacheApiKey))
)
Expand All @@ -58,6 +61,8 @@ class RedisModule extends Module {
val defaultBindings = Seq(
bind[JedisPool].toProvider[JedisPoolProvider],
bind[Pool].toProvider[SedisPoolProvider],
bind[JedisSentinelPool].toProvider[JedisSentinelPoolProvider],
bind[SentinelPool].toProvider[SedisSentinelPoolProvider],
bind[JavaCacheApi].to[DefaultJavaCacheApi]
) ++ bindCaches.flatMap(bindCache)

Expand All @@ -82,6 +87,13 @@ class NamedRedisCacheApiProvider(namespace: String, client: BindingKey[Pool], cl
}
}

class NamedSentinelCacheApiProvider(namespace: String, client: BindingKey[SentinelPool], classLoader: ClassLoader) extends Provider[CacheApi] {
@Inject private var injector: Injector = _
lazy val get: CacheApi = {
new SentinelCacheApi(namespace, injector.instanceOf(client), classLoader)
}
}

class NamedJavaCacheApiProvider(key: BindingKey[CacheApi]) extends Provider[JavaCacheApi] {
@Inject private var injector: Injector = _
lazy val get: JavaCacheApi = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.typesafe.play.redis

import javax.inject.{Inject, Provider, Singleton}

import org.sedis.SentinelPool
import redis.clients.jedis.JedisSentinelPool

@Singleton
class SedisSentinelPoolProvider @Inject()(jedisSentinelPool: JedisSentinelPool) extends Provider[SentinelPool] {
lazy val get: SentinelPool = {
val sedisSentinelPool = {
new SentinelPool(jedisSentinelPool)
}
sedisSentinelPool
}
}
122 changes: 122 additions & 0 deletions redis/src/main/scala/com/typesafe/play/redis/SentinelCacheApi.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.typesafe.play.redis

import java.io._
import javax.inject.{Inject, Singleton}

import biz.source_code.base64Coder.Base64Coder
import org.sedis.SentinelPool
import play.api.Logger
import play.api.cache.CacheApi

import scala.concurrent.duration.Duration
import scala.reflect.ClassTag


@Singleton
class SentinelCacheApi @Inject()(val namespace: String, sedisPool: SentinelPool, classLoader: ClassLoader) extends CacheApi {

private val namespacedKey: (String => String) = { x => s"$namespace::$x" }

def get[T](userKey: String)(implicit ct: ClassTag[T]): Option[T] = {
Logger.trace(s"Reading key ${namespacedKey(userKey)}")

try {
val rawData = sedisPool.withJedisClient { client => client.get(namespacedKey(userKey)) }
rawData match {
case null =>
None
case _ =>
val data: Seq[String] = rawData.split("-")
val bytes = Base64Coder.decode(data.last)
data.head match {
case "oos" => Some(withObjectInputStream(bytes)(_.readObject().asInstanceOf[T]))
case "string" => Some(withDataInputStream(bytes)(_.readUTF().asInstanceOf[T]))
case "int" => Some(withDataInputStream(bytes)(_.readInt().asInstanceOf[T]))
case "long" => Some(withDataInputStream(bytes)(_.readLong().asInstanceOf[T]))
case "boolean" => Some(withDataInputStream(bytes)(_.readBoolean().asInstanceOf[T]))
case _ => throw new IOException(s"was not able to recognize the type of serialized value. The type was ${data.head} ")
}
}
} catch {
case ex: Exception =>
Logger.warn("could not deserialize key:" + namespacedKey(userKey), ex)
None
}
}

def getOrElse[A: ClassTag](userKey: String, expiration: Duration)(orElse: => A) = {
get[A](userKey).getOrElse {
val value = orElse
set(userKey, value, expiration)
value
}
}

def remove(userKey: String): Unit = sedisPool.withJedisClient(_.del(namespacedKey(userKey)))

def set(userKey: String, value: Any, expiration: Duration) {
val expirationInSec = if (expiration == Duration.Inf) 0 else expiration.toSeconds.toInt
val key = namespacedKey(userKey)

var oos: ObjectOutputStream = null
var dos: DataOutputStream = null
try {
val baos = new ByteArrayOutputStream()
val prefix = value match {
case _: String =>
dos = new DataOutputStream(baos)
dos.writeUTF(value.asInstanceOf[String])
"string"
case _: Int =>
dos = new DataOutputStream(baos)
dos.writeInt(value.asInstanceOf[Int])
"int"
case _: Long =>
dos = new DataOutputStream(baos)
dos.writeLong(value.asInstanceOf[Long])
"long"
case _: Boolean =>
dos = new DataOutputStream(baos)
dos.writeBoolean(value.asInstanceOf[Boolean])
"boolean"
case _: Serializable =>
oos = new ObjectOutputStream(baos)
oos.writeObject(value)
oos.flush()
"oos"
case _ =>
throw new IOException("could not serialize: " + value.toString)
}

val redisV = prefix + "-" + new String(Base64Coder.encode(baos.toByteArray))
Logger.trace(s"Setting key $key to $redisV")

sedisPool.withJedisClient { client =>
client.set(key, redisV)
if (expirationInSec != 0) client.expire(key, expirationInSec)
}
} catch {
case ex: IOException =>
Logger.warn("could not serialize key:" + key + " and value:" + value.toString + " ex:" + ex.toString)
} finally {
if (oos != null) oos.close()
if (dos != null) dos.close()
}
}

private class ClassLoaderObjectInputStream(stream: InputStream) extends ObjectInputStream(stream) {
override protected def resolveClass(desc: ObjectStreamClass) = {
Class.forName(desc.getName, false, classLoader)
}
}

private def withDataInputStream[T](bytes: Array[Byte])(f: DataInputStream => T): T = {
val dis = new DataInputStream(new ByteArrayInputStream(bytes))
try f(dis) finally dis.close()
}

private def withObjectInputStream[T](bytes: Array[Byte])(f: ObjectInputStream => T): T = {
val ois = new ClassLoaderObjectInputStream(new ByteArrayInputStream(bytes))
try f(ois) finally ois.close()
}
}