Skip to content

Redis Management

frikilax edited this page Apr 28, 2020 · 2 revisions

Description

Darwin now has a centralized Redis manager, that allows to handle concurrent Redis connections for each thread of a filter.

This page is here to help developers use Redis.

How it works

Each Filter has a manager singleton, handling globally Redis connections for the threads, so each thread owns an active and separate connection to Redis to add/fetch data to.

The manager allows to define the connection to Redis either with an IP:port or with a path to an unix socket. At least one of those configurations should be given before trying to query Redis. Parameters such as the health check interval (more on this below) and the connection timeout can manually be set, allowing runtime configuration through parameters.

Health checking

In the past, threads' connections were handled by a separate thread, the Janitor, which only role was to cycle through each thread's connection to ping the corresponding Redis server. This method involved increased overhead and complexity. Connection health check is now handled directly by the thread during calls to servers:

  • the health check interval (default 8 seconds) defines the time interval since last call from which to consider a health check is necessary. In other words, if left to 8 seconds, a thread will trigger a health check if the last call to the Redis server was 8 seconds or more in the past.
  • the health check in itself works as follows
    1. if there is no active connection, don't proceed
    2. try to ping the server, if it responds health check is successful
    3. try to reconnect to the server, if reconnection is successful health check is successful
    4. try to open a new connection to the current globally configured main server, if connection is successful health check is successful

Connection management

After setting the connection method, it can be validated with 2 methods:

  • Connect() will simply try to connect to the configured Redis server, whether it be an unix socket or an IP:port. This function returns whether the connection was successful or not
  • FindAndConnect() will use configured and known connections to find a Redis master, and connect to it. This function will keep information on master and replicas in global configuration for later use during reconnection attempts.

Connection issues

During the lifetime of the filter(s), the Redis server(s) can become unavailable, change their priorities, or connection can simply be lost.

The Redis Manager has ways to detect and remediate this:

  • IsMaster() or GetRole() can return the state of the currently connected server
  • FindAndConnectWithRateLimiting() allows to refresh global connection information by querying all known servers to find new master and slaves, it will in most cases be able to reconfigure globally the knowledge of currently active and configured Redis servers and reconnect the calling thread. Be careful though, as this function will only update global configuration and thread's specific connection entry, all other threads will have to update with new global configuration either with this function, or Connect(). Be also wary that this function applies rate limiting = discovery won't happen if last thread discovery was less than health check interval seconds ago, this is the preferred way and should be used in favor of FindAndConnect() during normal run to avoid flooding networks and Redis servers during reconnection attempts !

Queries

Queries can be done as soon as after a valid connection has been made (either through Connect() and/or FindAndConnect*(). The querying function was built for flexibility, and will always return the query status (be it an error, an integer, a string, an array...). Different prototypes allows the user to recover the data type expected (an integer, a string or an array). In the case of an array, boost::any type is used to store data, and it is the filter's responsibility to parse correctly the object. Note that the structure shape will not differ from the one returned by Redis.

In addition, there exists a parameter allowing for easier management of disconnections and/or replica write failures reconnectRetry allows to automatically trigger a FindAndConnectWithRateLimiting() call to handle disconnection errors AND write errors when connected to a (newly appointed) replica. This behaviour will lengthen the call when errors occur, but will discharge calling filters of the responsibility to respond to some errors during queries:

  • if the server is disconnected, the currently configured connection will be tried (allows faster reconnection for following threads when one of them previously triggered a discovery)
  • if the main configured connection is not valid, a new discovery is triggered through FindAndConnectWithRateLimiting()
  • if the connected server is a replica and the query failed because of that, the new master will be searched and connected (note that a filter COULD use a replica as long as the queries don't involve writing)
  • if a new master connection was ultimately found, a new attempt is made for the query, and the result (whatever it is) is returned

Filters still need to check return codes, and behave accordingly ! But this option allows for simple and hassle-free use of Redis connections in most cases. If specific behaviour is to be achieved, all publicly available functions allows for disconnections/connections/discoveries, and if new functionalities are required feel free to open an issue or create a PR !

Typical Usage

For a filter, only two steps are necessary:

  • the setup (before launching the threads):
darwin::toolkit::RedisManager& redis = darwin::toolkit::RedisManager::GetInstance();
redis.SetUnixConnection(redis_socket_path);
return redis.Connect(); //for direct connection, without wondering for master/replicas
return redis.FindAndConnect(); // to check server status and automatically connect to master
  • a query without data return:
int resCode;
darwin::toolkit::RedisManager& redis = darwin::toolkit::RedisManager::GetInstance();
resCode = redis.Query(std::vector<std::string>{"LPUSH", _redis_list_name, logs});
  • a query with expected integer, and automatic reconnection:
int resCode;
long long int result;
darwin::toolkit::RedisManager& redis = darwin::toolkit::RedisManager::GetInstance();
resCode = redis.Query(std::vector<std::string>{"LPUSH", _redis_list_name, logs}, result, /*automatic reconnection*/ true);

example taken from Flogs

Data returned must be passed as a reference in parameters, thus this specific query won't return any data except the return code.

  • a query with an array data return:
std::any result;
darwin::toolkit::RedisManager& redis = darwin::toolkit::RedisManager::GetInstance();
if(redis.Query(std::vector<std::string>{"SPOP", _redis_list_name, std::to_string(len)}, result) != REDIS_REPLY_ARRAY)
{
    DARWIN_LOG_ERROR("AnomalyThread::REDISPopLogs:: Not the expected Redis response");
    return -1;
}

try {
    result_vector = std::any_cast<std::vector<std::any>>(result);
}
catch (const std::bad_any_cast&) {}

DARWIN_LOG_DEBUG("Got " + std::to_string(result_vector.size()) + " entries from Redis");

for(auto& object : result_vector) {
    try {
        logs.emplace_back(std::any_cast<std::string>(object));
    }
    catch(const std::bad_any_cast&) {}
}

example taken from Ftanomaly

The data returned is passed by reference in the result variable as a boost::any, that can be casted (with try/catch) with the expected data types.

Of course, the result code should be checked before.

This return code can be any of:

  • REDIS_REPLY_STRING
  • REDIS_REPLY_ARRAY
  • REDIS_REPLY_INTEGER
  • REDIS_REPLY_NIL
  • REDIS_REPLY_STATUS
  • REDIS_REPLY_ERROR (logical error)
  • REDIS_CONNNECTION_ERROR (connection error)
Clone this wiki locally