From: W.C.A. Wijngaards Date: Thu, 24 Jul 2025 09:05:25 +0000 (+0200) Subject: - Redis checks for server down and throttles reconnects. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=424f86466a06ca7d0730d257127f256a28d582f2;p=thirdparty%2Funbound.git - Redis checks for server down and throttles reconnects. --- diff --git a/cachedb/redis.c b/cachedb/redis.c index 3dfa95859..cebfddfd5 100644 --- a/cachedb/redis.c +++ b/cachedb/redis.c @@ -46,6 +46,8 @@ #include "cachedb/cachedb.h" #include "util/alloc.h" #include "util/config_file.h" +#include "util/locks.h" +#include "util/timeval_func.h" #include "sldns/sbuffer.h" #ifdef USE_REDIS @@ -75,6 +77,18 @@ struct redis_moddata { /* timeout for connection setup */ struct timeval connect_timeout; struct timeval replica_connect_timeout; + /* the reconnect interval time. */ + struct timeval reconnect_interval; + struct timeval replica_reconnect_interval; + /* reconnect attempts, 0 if connected, counts up failed reconnects. */ + int reconnect_attempts; + int replica_reconnect_attempts; + /* Lock on reconnect_wait time. */ + lock_basic_type wait_lock; + lock_basic_type replica_wait_lock; + /* reconnect wait time, wait until it has passed before reconnect. */ + struct timeval reconnect_wait; + struct timeval replica_reconnect_wait; /* the redis logical database to use */ int logical_db; int replica_logical_db; @@ -82,6 +96,10 @@ struct redis_moddata { int set_with_ex_available; }; +/** The limit on the number of redis connect attempts. After failure if + * the number is exceeded, the reconnects are throttled by the wait time. */ +#define REDIS_RECONNECT_ATTEMPT_LIMIT 3 + static redisReply* redis_command(struct module_env*, struct cachedb_env*, const char*, const uint8_t*, size_t, int); @@ -105,6 +123,8 @@ moddata_clean(struct redis_moddata** moddata) { } free((*moddata)->replica_ctxs); } + lock_basic_destroy(&(*moddata)->wait_lock); + lock_basic_destroy(&(*moddata)->replica_wait_lock); free(*moddata); *moddata = NULL; } @@ -113,10 +133,31 @@ static redisContext* redis_connect(const char* host, int port, const char* path, const char* password, int logical_db, const struct timeval connect_timeout, - const struct timeval command_timeout) + const struct timeval command_timeout, + const struct timeval* reconnect_interval, + int* reconnect_attempts, + struct timeval* reconnect_wait, + lock_basic_type* wait_lock, + struct timeval* now_tv, + const char* infostr) { redisContext* ctx; + /* See if the redis server is down, and reconnect has to wait. */ + if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) { + /* Acquire lock to look at timeval, the integer has atomic + * integrity. */ + struct timeval wait_tv; + lock_basic_lock(wait_lock); + wait_tv = *reconnect_wait; + lock_basic_unlock(wait_lock); + if(timeval_smaller(now_tv, &wait_tv)) { + verbose(VERB_ALGO, "redis %sdown, reconnect wait", + infostr); + return NULL; + } + } + if(path && path[0]!=0) { ctx = redisConnectUnixWithTimeout(path, connect_timeout); } else { @@ -126,18 +167,18 @@ redis_connect(const char* host, int port, const char* path, const char *errstr = "out of memory"; if(ctx) errstr = ctx->errstr; - log_err("failed to connect to redis server: %s", errstr); + log_err("failed to connect to redis %sserver: %s", infostr, errstr); goto fail; } if(redisSetTimeout(ctx, command_timeout) != REDIS_OK) { - log_err("failed to set redis timeout, %s", ctx->errstr); + log_err("failed to set redis %stimeout, %s", infostr, ctx->errstr); goto fail; } if(password && password[0]!=0) { redisReply* rep; rep = redisCommand(ctx, "AUTH %s", password); if(!rep || rep->type == REDIS_REPLY_ERROR) { - log_err("failed to authenticate with password"); + log_err("failed to authenticate %swith password", infostr); freeReplyObject(rep); goto fail; } @@ -147,18 +188,20 @@ redis_connect(const char* host, int port, const char* path, redisReply* rep; rep = redisCommand(ctx, "SELECT %d", logical_db); if(!rep || rep->type == REDIS_REPLY_ERROR) { - log_err("failed to set logical database (%d)", - logical_db); + log_err("failed %sto set logical database (%d)", + infostr, logical_db); freeReplyObject(rep); goto fail; } freeReplyObject(rep); } + *reconnect_attempts = 0; if(verbosity >= VERB_OPS) { char port_str[6+1]; port_str[0] = ' '; (void)snprintf(port_str+1, sizeof(port_str)-1, "%d", port); - verbose(VERB_OPS, "Connection to Redis established (%s%s)", + verbose(VERB_OPS, "Connection to Redis %sestablished (%s%s)", + infostr, path&&path[0]!=0?path:host, path&&path[0]!=0?"":port_str); } @@ -167,6 +210,18 @@ redis_connect(const char* host, int port, const char* path, fail: if(ctx) redisFree(ctx); + (*reconnect_attempts)++; + if(*reconnect_attempts > REDIS_RECONNECT_ATTEMPT_LIMIT) { + /* Wait for the reconnect interval before trying again. */ + struct timeval tv; + tv = *now_tv; + timeval_add(&tv, reconnect_interval); + lock_basic_lock(wait_lock); + *reconnect_wait = tv; + lock_basic_unlock(wait_lock); + verbose(VERB_ALGO, "redis %sreconnect wait until %d.%6.6d", + infostr, (int)tv.tv_sec, (int)tv.tv_usec); + } return NULL; } @@ -191,6 +246,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) log_err("out of memory"); goto fail; } + lock_basic_init(&moddata->wait_lock); + lock_protect(&moddata->wait_lock, &moddata->reconnect_wait, + sizeof(moddata->reconnect_wait)); + lock_basic_init(&moddata->replica_wait_lock); + lock_protect(&moddata->replica_wait_lock, + &moddata->replica_reconnect_wait, + sizeof(moddata->replica_reconnect_wait)); moddata->numctxs = env->cfg->num_threads; /* note: server_host and similar string configuration options are * shallow references to configured strings; we don't have to free them @@ -219,6 +281,8 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) set_timeout(&moddata->replica_connect_timeout, env->cfg->redis_replica_timeout, env->cfg->redis_replica_connect_timeout); + set_timeout(&moddata->reconnect_interval, 1000, 0); + set_timeout(&moddata->replica_reconnect_interval, 1000, 0); moddata->logical_db = env->cfg->redis_logical_db; moddata->replica_logical_db = env->cfg->redis_replica_logical_db; @@ -245,7 +309,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) moddata->server_password, moddata->logical_db, moddata->connect_timeout, - moddata->command_timeout); + moddata->command_timeout, + &moddata->reconnect_interval, + &moddata->reconnect_attempts, + &moddata->reconnect_wait, + &moddata->wait_lock, + env->now_tv, + ""); if(!ctx) { log_err("redis_init: failed to init redis " "(for thread %d)", i); @@ -263,7 +333,13 @@ redis_init(struct module_env* env, struct cachedb_env* cachedb_env) moddata->replica_server_password, moddata->replica_logical_db, moddata->replica_connect_timeout, - moddata->replica_command_timeout); + moddata->replica_command_timeout, + &moddata->replica_reconnect_interval, + &moddata->replica_reconnect_attempts, + &moddata->replica_reconnect_wait, + &moddata->replica_wait_lock, + env->now_tv, + "replica "); if(!ctx) { log_err("redis_init: failed to init redis " "replica (for thread %d)", i); @@ -364,7 +440,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env, d->replica_server_password, d->replica_logical_db, d->replica_connect_timeout, - d->replica_command_timeout); + d->replica_command_timeout, + &d->replica_reconnect_interval, + &d->replica_reconnect_attempts, + &d->replica_reconnect_wait, + &d->replica_wait_lock, + env->now_tv, + "replica "); } else { ctx = redis_connect( d->server_host, @@ -373,7 +455,13 @@ redis_command(struct module_env* env, struct cachedb_env* cachedb_env, d->server_password, d->logical_db, d->connect_timeout, - d->command_timeout); + d->command_timeout, + &d->reconnect_interval, + &d->reconnect_attempts, + &d->reconnect_wait, + &d->wait_lock, + env->now_tv, + ""); } ctx_selector[env->alloc->thread_num] = ctx; }