From: Arran Cudbard-Bell Date: Sat, 27 Jun 2015 21:04:53 +0000 (-0400) Subject: Re-order connection pool functions X-Git-Tag: release_3_0_9~50 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=81b852c22dbf0325fa8b1cd0de323102c55648d2;p=thirdparty%2Ffreeradius-server.git Re-order connection pool functions Public functions at the bottom of connection.c, private ones at the top, group by pool and connection functions. --- diff --git a/src/include/connection.h b/src/include/connection.h index 4714eaa3489..475386ce55e 100644 --- a/src/include/connection.h +++ b/src/include/connection.h @@ -70,31 +70,50 @@ typedef void *(*fr_connection_create_t)(TALLOC_CTX *ctx, void *opaque); */ typedef int (*fr_connection_alive_t)(void *opaque, void *connection); -fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module, - void *opaque, - fr_connection_create_t c, - fr_connection_alive_t a, - char const *prefix); - -fr_connection_pool_t *fr_connection_pool_init(TALLOC_CTX *ctx, - CONF_SECTION *cs, - void *opaque, - fr_connection_create_t c, - fr_connection_alive_t a, - char const *log_prefix, - char const *trigger_prefix); - -fr_connection_pool_t *fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque); -int fr_connection_pool_reconnect(fr_connection_pool_t *pool); - -void fr_connection_pool_free(fr_connection_pool_t *pool); - -void *fr_connection_get(fr_connection_pool_t *pool); -int fr_connection_get_num(fr_connection_pool_t *pool); -void *fr_connection_get_opaque(fr_connection_pool_t *pool); -void fr_connection_release(fr_connection_pool_t *pool, void *conn); -void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn); -int fr_connection_del(fr_connection_pool_t *pool, void *conn); +/* + * Pool allocation/initialisation + */ +fr_connection_pool_t *fr_connection_pool_init(TALLOC_CTX *ctx, + CONF_SECTION *cs, + void *opaque, + fr_connection_create_t c, + fr_connection_alive_t a, + char const *log_prefix, + char const *trigger_prefix); + +fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module, + void *opaque, + fr_connection_create_t c, + fr_connection_alive_t a, + char const *prefix); + +fr_connection_pool_t *fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque); + + +/* + * Pool getters + */ +int fr_connection_pool_get_num(fr_connection_pool_t *pool); + +void *fr_connection_pool_get_opaque(fr_connection_pool_t *pool); + +/* + * Pool management + */ +int fr_connection_pool_reconnect(fr_connection_pool_t *pool); + +void fr_connection_pool_free(fr_connection_pool_t *pool); + +/* + * Connection lifecycle + */ +void *fr_connection_get(fr_connection_pool_t *pool); + +void fr_connection_release(fr_connection_pool_t *pool, void *conn); + +void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn); + +int fr_connection_close_internal(fr_connection_pool_t *pool, void *conn); #ifdef __cplusplus } diff --git a/src/main/connection.c b/src/main/connection.c index 1fc0766e03e..2d61a171184 100644 --- a/src/main/connection.c +++ b/src/main/connection.c @@ -168,7 +168,39 @@ static const CONF_PARSER connection_config[] = { { NULL, -1, 0, NULL, NULL } }; -static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn); +static fr_connection_t *fr_connection_reconnect_internal(fr_connection_pool_t *pool, fr_connection_t *conn); + +/** Order connections by reserved most recently + */ +static int last_reserved_cmp(void const *one, void const *two) +{ + fr_connection_t const *a = one; + fr_connection_t const *b = two; + + if (a->last_reserved.tv_sec < b->last_reserved.tv_sec) return -1; + if (a->last_reserved.tv_sec > b->last_reserved.tv_sec) return +1; + + if (a->last_reserved.tv_usec < b->last_reserved.tv_usec) return -1; + if (a->last_reserved.tv_usec > b->last_reserved.tv_usec) return +1; + + return 0; +} + +/** Order connections by released longest ago + */ +static int last_released_cmp(void const *one, void const *two) +{ + fr_connection_t const *a = one; + fr_connection_t const *b = two; + + if (b->last_released.tv_sec < a->last_released.tv_sec) return -1; + if (b->last_released.tv_sec > a->last_released.tv_sec) return +1; + + if (b->last_released.tv_usec < a->last_released.tv_usec) return -1; + if (b->last_released.tv_usec > a->last_released.tv_usec) return +1; + + return 0; +} /** Removes a connection from the connection list * @@ -240,6 +272,52 @@ static void fr_connection_exec_trigger(fr_connection_pool_t *pool, char const *n exec_trigger(NULL, pool->cs, name, true); } +/** Find a connection handle in the connection list + * + * Walks over the list of connections searching for a specified connection + * handle and returns the first connection that contains that pointer. + * + * @note Will lock mutex and only release mutex if connection handle + * is not found, so will usually return will mutex held. + * @note Must be called with the mutex free. + * + * @param[in] pool to search in. + * @param[in] conn handle to search for. + * @return + * - Connection containing the specified handle. + * - NULL if non if connection was found. + */ +static fr_connection_t *fr_connection_find(fr_connection_pool_t *pool, void *conn) +{ + fr_connection_t *this; + + if (!pool || !conn) return NULL; + + pthread_mutex_lock(&pool->mutex); + + /* + * FIXME: This loop could be avoided if we passed a 'void + * **connection' instead. We could use "offsetof" in + * order to find top of the parent structure. + */ + for (this = pool->head; this != NULL; this = this->next) { + if (this->connection == conn) { +#ifdef PTHREAD_DEBUG + pthread_t pthread_id; + + pthread_id = pthread_self(); + rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0); +#endif + + rad_assert(this->in_use == true); + return this; + } + } + + pthread_mutex_unlock(&pool->mutex); + return NULL; +} + /** Spawns a new connection * * Spawns a new connection using the create callback, and returns it for @@ -438,7 +516,7 @@ static fr_connection_t *fr_connection_spawn(fr_connection_pool_t *pool, time_t n * @param[in,out] pool to modify. * @param[in,out] this Connection to delete. */ -static void fr_connection_close(fr_connection_pool_t *pool, fr_connection_t *this) +static void fr_connection_close_internal(fr_connection_pool_t *pool, fr_connection_t *this) { /* * If it's in use, release it. @@ -470,337 +548,386 @@ static void fr_connection_close(fr_connection_pool_t *pool, fr_connection_t *thi talloc_free(this); } - -/** Reconnect a suspected inviable connection +/** Check whether a connection needs to be removed from the pool * - * @note Must be called with the mutex held, will not release mutex. + * Will verify that the connection is within idle_timeout, max_uses, and + * lifetime values. If it is not, the connection will be closed. * - * @see fr_connection_get - * @param[in,out] pool to reconnect the connection in. - * @param[in,out] conn to reconnect. - * @return new connection handle if successful else NULL. + * @note Will only close connections not in use. + * @note Must be called with the mutex held. + * + * @param[in,out] pool to modify. + * @param[in,out] this Connection to manage. + * @param[in] now Current time. + * @return + * - 0 if connection was closed. + * - 1 if connection handle was left open. */ -static fr_connection_t *fr_connection_reconnect_internal(fr_connection_pool_t *pool, fr_connection_t *conn) +static int fr_connection_manage(fr_connection_pool_t *pool, + fr_connection_t *this, + time_t now) { - void *new_conn; - uint64_t conn_number; - TALLOC_CTX *ctx; - - conn_number = conn->number; - - /* - * Destroy any handles associated with the fr_connection_t - */ - talloc_free_children(conn); - - DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number); + rad_assert(pool != NULL); + rad_assert(this != NULL); /* - * Allocate a new top level ctx for the create callback - * to hang its memory off of. + * Don't terminated in-use connections */ - ctx = talloc_init("fr_connection_ctx"); - if (!ctx) return NULL; - fr_link_talloc_ctx_free(conn, ctx); - - new_conn = pool->create(ctx, pool->opaque); - if (!new_conn) { - /* - * We can't create a new connection, so close the current one. - */ - fr_connection_close(pool, conn); - - /* - * Maybe there's a connection which is unused and - * available. If so, return it. - */ - new_conn = fr_connection_get_internal(pool, false); - if (new_conn) return new_conn; + if (this->in_use) return 1; - RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available", - pool->log_prefix, conn_number)); + if ((pool->max_uses > 0) && + (this->num_uses >= pool->max_uses)) { + DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit max_uses limit", pool->log_prefix, + this->number); + do_delete: + if (pool->num <= pool->min) { + RATE_LIMIT(WARN("%s: You probably need to lower \"min\"", pool->log_prefix)); + } + fr_connection_close_internal(pool, this); + return 0; + } - return NULL; + if ((pool->lifetime > 0) && + ((this->created + pool->lifetime) < now)) { + DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit lifetime limit", + pool->log_prefix, this->number); + goto do_delete; } - fr_connection_exec_trigger(pool, "close"); - conn->connection = new_conn; - conn->needs_reconnecting = false; + if ((pool->idle_timeout > 0) && + ((this->last_released.tv_sec + pool->idle_timeout) < now)) { + INFO("%s: Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %u seconds", + pool->log_prefix, this->number, (int) (now - this->last_released.tv_sec)); + goto do_delete; + } - return new_conn; + return 1; } -/** Find a connection handle in the connection list + +/** Check whether any connections need to be removed from the pool * - * Walks over the list of connections searching for a specified connection - * handle and returns the first connection that contains that pointer. + * Maintains the number of connections in the pool as per the configuration + * parameters for the connection pool. * - * @note Will lock mutex and only release mutex if connection handle - * is not found, so will usually return will mutex held. - * @note Must be called with the mutex free. + * @note Will only run checks the first time it's called in a given second, + * to throttle connection spawning/closing. + * @note Will only close connections not in use. + * @note Must be called with the mutex held, will release mutex before + * returning. * - * @param[in] pool to search in. - * @param[in] conn handle to search for. - * @return - * - Connection containing the specified handle. - * - NULL if non if connection was found. + * @param[in,out] pool to manage. + * @return 1 */ -static fr_connection_t *fr_connection_find(fr_connection_pool_t *pool, void *conn) +static int fr_connection_pool_check(fr_connection_pool_t *pool) { - fr_connection_t *this; + uint32_t spawn, idle, extra; + time_t now = time(NULL); + fr_connection_t *this, *next; - if (!pool || !conn) return NULL; + if (pool->last_checked == now) { + pthread_mutex_unlock(&pool->mutex); + return 1; + } - pthread_mutex_lock(&pool->mutex); + /* + * Some idle connections are OK, if they're within the + * configured "spare" range. Any extra connections + * outside of that range can be closed. + */ + idle = pool->num - pool->active; + if (idle <= pool->spare) { + extra = 0; + } else { + extra = idle - pool->spare; + } /* - * FIXME: This loop could be avoided if we passed a 'void - * **connection' instead. We could use "offsetof" in - * order to find top of the parent structure. + * The other end can close connections. If so, we'll + * have fewer than "min". When that happens, open more + * connections to enforce "min". */ - for (this = pool->head; this != NULL; this = this->next) { - if (this->connection == conn) { -#ifdef PTHREAD_DEBUG - pthread_t pthread_id; + if ((pool->num + pool->pending) <= pool->min) { + spawn = pool->min - (pool->num + pool->pending); + extra = 0; - pthread_id = pthread_self(); - rad_assert(pthread_equal(this->pthread_id, pthread_id) != 0); -#endif + /* + * If we're about to create more than "max", + * don't create more. + */ + } else if ((pool->num + pool->pending) >= pool->max) { + /* + * Ensure we don't spawn more connections. If + * there are extra idle connections, we can + * delete all of them. + */ + spawn = 0; + /* leave extra alone from above */ - rad_assert(this->in_use == true); - return this; + /* + * min < num < max + * + * AND we don't have enough idle connections. + * Open some more. + */ + } else if (idle <= pool->spare) { + /* + * Not enough spare connections. Spawn a few. + * But cap the pool size at "max" + */ + spawn = pool->spare - idle; + extra = 0; + + if ((pool->num + pool->pending + spawn) > pool->max) { + spawn = pool->max - (pool->num + pool->pending); } - } - pthread_mutex_unlock(&pool->mutex); - return NULL; -} + /* + * min < num < max + * + * We have more than enough idle connections, AND + * some are pending. Don't open or close any. + */ + } else if (pool->pending) { + spawn = 0; + extra = 0; -/** Delete a connection from the connection pool. - * - * Resolves the connection handle to a connection, then (if found) - * closes, unlinks and frees that connection. - * - * @note Must be called with the mutex free. - * - * @param[in,out] pool Connection pool to modify. - * @param[in] conn to delete. - * @return - * - 0 If the connection could not be found. - * - 1 if the connection was deleted. - */ -int fr_connection_del(fr_connection_pool_t *pool, void *conn) -{ - fr_connection_t *this; + /* + * We have too many idle connections, but closing + * some would take us below "min", so we only + * close enough to take us to "min". + */ + } else if ((pool->min + extra) >= pool->num) { + spawn = 0; + extra = pool->num - pool->min; - this = fr_connection_find(pool, conn); - if (!this) return 0; + } else { + /* + * Closing the "extra" connections won't take us + * below "min". It's therefore safe to close + * them all. + */ + spawn = 0; + /* leave extra alone from above */ + } - INFO("%s: Deleting connection (%" PRIu64 ")", pool->log_prefix, this->number); + /* + * Only try to open spares if we're not already attempting to open + * a connection. Avoids spurious log messages. + */ + if (spawn) { + INFO("%s: %i of %u connections in use. Need more spares", pool->log_prefix, pool->active, pool->num); + pthread_mutex_unlock(&pool->mutex); + fr_connection_spawn(pool, now, false); /* ignore return code */ + pthread_mutex_lock(&pool->mutex); + } - fr_connection_close(pool, this); - fr_connection_pool_check(pool); - return 1; -} + /* + * We haven't spawned connections in a while, and there + * are too many spare ones. Close the one which has been + * unused for the longest. + */ + if (extra && (now >= (pool->last_spawned + pool->delay_interval))) { + fr_connection_t *found; -/** Delete a connection pool - * - * Closes, unlinks and frees all connections in the connection pool, then frees - * all memory used by the connection pool. - * - * @note Will call the 'stop' trigger. - * @note Must be called with the mutex free. - * - * @param[in,out] pool to delete. - */ -void fr_connection_pool_free(fr_connection_pool_t *pool) -{ - fr_connection_t *this; + found = NULL; + for (this = pool->tail; this != NULL; this = this->prev) { + if (this->in_use) continue; - if (!pool) return; + if (!found || + timercmp(&this->last_reserved, &found->last_reserved, <)) { + found = this; + } + } - /* - * More modules hold a reference to this pool, don't free - * it yet. - */ - if (pool->ref > 0) { - pool->ref--; - return; - } + rad_assert(found != NULL); - DEBUG("%s: Removing connection pool", pool->log_prefix); + INFO("%s: Closing connection (%" PRIu64 "), from %d unused connections", pool->log_prefix, + found->number, extra); + fr_connection_close_internal(pool, found); - pthread_mutex_lock(&pool->mutex); + /* + * Decrease the delay for the next time we clean + * up. + */ + pool->next_delay >>= 1; + if (pool->next_delay == 0) pool->next_delay = 1; + pool->delay_interval += pool->next_delay; + } /* - * Don't loop over the list. Just keep removing the head - * until they're all gone. + * Pass over all of the connections in the pool, limiting + * lifetime, idle time, max requests, etc. */ - while ((this = pool->head) != NULL) { - INFO("%s: Closing connection (%" PRIu64 ")", pool->log_prefix, this->number); - - fr_connection_close(pool, this); + for (this = pool->head; this != NULL; this = next) { + next = this->next; + fr_connection_manage(pool, this, now); } - fr_heap_delete(pool->heap); - - fr_connection_exec_trigger(pool, "stop"); - - rad_assert(pool->head == NULL); - rad_assert(pool->tail == NULL); - rad_assert(pool->num == 0); + pool->last_checked = now; + pthread_mutex_unlock(&pool->mutex); - talloc_free(pool); + return 1; } -/** Initialise a module specific connection pool - * - * @see fr_connection_pool_init +/** Get a connection from the connection pool * - * @param[in] module section. - * @param[in] opaque data pointer to pass to callbacks. - * @param[in] c Callback to create new connections. - * @param[in] a Callback to check the status of connections. - * @param[in] log_prefix override, if NULL will be set automatically from the module CONF_SECTION. + * @param[in,out] pool to reserve the connection from. + * @param[in] spawn whether to spawn a new connection * @return - * - New connection pool. + * - A pointer to the connection handle. * - NULL on error. */ -fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module, - void *opaque, - fr_connection_create_t c, - fr_connection_alive_t a, - char const *log_prefix) +static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn) { - CONF_SECTION *cs, *mycs; - char buff[128]; - char trigger_prefix[64]; - - fr_connection_pool_t *pool; - char const *cs_name1, *cs_name2; - - int ret; + time_t now; + fr_connection_t *this; -#define CONNECTION_POOL_CF_KEY "connection_pool" -#define parent_name(_x) cf_section_name(cf_item_parent(cf_section_to_item(_x))) + if (!pool) return NULL; - cs_name1 = cf_section_name1(module); - cs_name2 = cf_section_name2(module); - if (!cs_name2) cs_name2 = cs_name1; + pthread_mutex_lock(&pool->mutex); - snprintf(trigger_prefix, sizeof(trigger_prefix), "modules.%s.", cs_name1); + now = time(NULL); - if (!log_prefix) { - snprintf(buff, sizeof(buff), "rlm_%s (%s)", cs_name1, cs_name2); - log_prefix = buff; - } + /* + * Grab the link with the lowest latency, and check it + * for limits. If "connection manage" says the link is + * no longer usable, go grab another one. + */ + do { + this = fr_heap_peek(pool->heap); + if (!this) break; + } while (!fr_connection_manage(pool, this, now)); /* - * Get sibling's pool config section + * We have a working connection. Extract it from the + * heap and use it. */ - ret = find_module_sibling_section(&cs, module, "pool"); - switch (ret) { - case -1: - return NULL; + if (this) { + /* + * Unless it needs reconnecting, in which + * case attempt to reconnect it. + */ + if (this->needs_reconnecting && !(this = fr_connection_reconnect_internal(pool, this))) { + pthread_mutex_unlock(&pool->mutex); - case 1: - DEBUG4("%s: Using pool section from \"%s\"", log_prefix, parent_name(cs)); - break; + ERROR("%s: Connection was marked for reconnection, but re-establishing connection failed", + pool->log_prefix); - case 0: - DEBUG4("%s: Using local pool section", log_prefix); - break; + return NULL; + } + fr_heap_extract(pool->heap, this); + goto do_return; } /* - * Get our pool config section + * We don't have a connection. Try to open a new one. */ - mycs = cf_section_sub_find(module, "pool"); - if (!mycs) { - DEBUG4("%s: Adding pool section to config item \"%s\" to store pool references", log_prefix, - cf_section_name(module)); + rad_assert(pool->active == pool->num); - mycs = cf_section_alloc(module, "pool", NULL); - cf_section_add(module, mycs); - } + if (pool->num == pool->max) { + bool complain = false; - /* - * Sibling didn't have a pool config section - * Use our own local pool. - */ - if (!cs) { - DEBUG4("%s: \"%s.pool\" section not found, using \"%s.pool\"", log_prefix, - parent_name(cs), parent_name(mycs)); - cs = mycs; - } + /* + * Rate-limit complaints. + */ + if (pool->last_at_max != now) { + complain = true; + pool->last_at_max = now; + } - /* - * If fr_connection_pool_init has already been called - * for this config section, reuse the previous instance. - * - * This allows modules to pass in the config sections - * they would like to use the connection pool from. - */ - pool = cf_data_find(cs, CONNECTION_POOL_CF_KEY); - if (!pool) { - DEBUG4("%s: No pool reference found for config item \"%s.pool\"", log_prefix, parent_name(cs)); - pool = fr_connection_pool_init(cs, cs, opaque, c, a, log_prefix, trigger_prefix); - if (!pool) return NULL; + pthread_mutex_unlock(&pool->mutex); - DEBUG4("%s: Adding pool reference %p to config item \"%s.pool\"", log_prefix, pool, parent_name(cs)); - cf_data_add(cs, CONNECTION_POOL_CF_KEY, pool, NULL); - return pool; + if (!RATE_LIMIT_ENABLED || complain) { + ERROR("%s: No connections available and at max connection limit", pool->log_prefix); + } + + return NULL; } - pool->ref++; - DEBUG4("%s: Found pool reference %p in config item \"%s.pool\"", log_prefix, pool, parent_name(cs)); + pthread_mutex_unlock(&pool->mutex); - /* - * We're reusing pool data add it to our local config - * section. This allows other modules to transitively - * re-use a pool through this module. - */ - if (mycs != cs) { - DEBUG4("%s: Copying pool reference %p from config item \"%s.pool\" to config item \"%s.pool\"", - log_prefix, pool, parent_name(cs), parent_name(mycs)); - cf_data_add(mycs, CONNECTION_POOL_CF_KEY, pool, NULL); - } + if (!spawn) return NULL; - return pool; -} + DEBUG("%s: %i of %u connections in use. You may need to increase \"spare\"", pool->log_prefix, + pool->active, pool->num); + this = fr_connection_spawn(pool, now, true); /* MY connection! */ + if (!this) return NULL; + pthread_mutex_lock(&pool->mutex); -/* - * Order connections by reserved most recently - */ -static int last_reserved_cmp(void const *one, void const *two) -{ - fr_connection_t const *a = one; - fr_connection_t const *b = two; +do_return: + pool->active++; + this->num_uses++; + gettimeofday(&this->last_reserved, NULL); + this->in_use = true; - if (a->last_reserved.tv_sec < b->last_reserved.tv_sec) return -1; - if (a->last_reserved.tv_sec > b->last_reserved.tv_sec) return +1; +#ifdef PTHREAD_DEBUG + this->pthread_id = pthread_self(); +#endif + pthread_mutex_unlock(&pool->mutex); - if (a->last_reserved.tv_usec < b->last_reserved.tv_usec) return -1; - if (a->last_reserved.tv_usec > b->last_reserved.tv_usec) return +1; + DEBUG("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number); - return 0; + return this->connection; } -/* - * Order connections by released least recently +/** Reconnect a suspected inviable connection + * + * @note Must be called with the mutex held, will not release mutex. + * + * @see fr_connection_get + * @param[in,out] pool to reconnect the connection in. + * @param[in,out] conn to reconnect. + * @return new connection handle if successful else NULL. */ -static int last_released_cmp(void const *one, void const *two) +static fr_connection_t *fr_connection_reconnect_internal(fr_connection_pool_t *pool, fr_connection_t *conn) { - fr_connection_t const *a = one; - fr_connection_t const *b = two; + void *new_conn; + uint64_t conn_number; + TALLOC_CTX *ctx; - if (b->last_released.tv_sec < a->last_released.tv_sec) return -1; - if (b->last_released.tv_sec > a->last_released.tv_sec) return +1; + conn_number = conn->number; - if (b->last_released.tv_usec < a->last_released.tv_usec) return -1; - if (b->last_released.tv_usec > a->last_released.tv_usec) return +1; + /* + * Destroy any handles associated with the fr_connection_t + */ + talloc_free_children(conn); - return 0; + DEBUG("%s: Reconnecting (%" PRIu64 ")", pool->log_prefix, conn_number); + + /* + * Allocate a new top level ctx for the create callback + * to hang its memory off of. + */ + ctx = talloc_init("fr_connection_ctx"); + if (!ctx) return NULL; + fr_link_talloc_ctx_free(conn, ctx); + + new_conn = pool->create(ctx, pool->opaque); + if (!new_conn) { + /* + * We can't create a new connection, so close the current one. + */ + fr_connection_close_internal(pool, conn); + + /* + * Maybe there's a connection which is unused and + * available. If so, return it. + */ + new_conn = fr_connection_get_internal(pool, false); + if (new_conn) return new_conn; + + RATE_LIMIT(ERROR("%s: Failed to reconnect (%" PRIu64 "), no free connections are available", + pool->log_prefix, conn_number)); + + return NULL; + } + + fr_connection_exec_trigger(pool, "close"); + conn->connection = new_conn; + conn->needs_reconnecting = false; + + return new_conn; } /** Create a new connection pool @@ -977,398 +1104,259 @@ fr_connection_pool_t *fr_connection_pool_init(TALLOC_CTX *ctx, return pool; } -/** Allocate a new pool using an existing one as a template +/** Initialise a module specific connection pool * - * @param ctx to allocate new pool in. - * @param pool to copy. - * @param opaque data to pass to connection function. + * @see fr_connection_pool_init + * + * @param[in] module section. + * @param[in] opaque data pointer to pass to callbacks. + * @param[in] c Callback to create new connections. + * @param[in] a Callback to check the status of connections. + * @param[in] log_prefix override, if NULL will be set automatically from the module CONF_SECTION. * @return * - New connection pool. * - NULL on error. */ -fr_connection_pool_t *fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque) +fr_connection_pool_t *fr_connection_pool_module_init(CONF_SECTION *module, + void *opaque, + fr_connection_create_t c, + fr_connection_alive_t a, + char const *log_prefix) { - return fr_connection_pool_init(ctx, pool->cs, opaque, pool->create, - pool->alive, pool->log_prefix, pool->trigger_prefix); -} + CONF_SECTION *cs, *mycs; + char buff[128]; + char trigger_prefix[64]; -/** Mark connections for reconnection, and spawn at least 'start' connections - * - * This intended to be called on a connection pool that's in use, to have it reflect - * a configuration change, or because the administrator knows that all connections - * in the pool are inviable and need to be reconnected. - * - * @param[in] pool to reconnect. - * @return - * - 0 On success. - * - -1 If we couldn't create start connections, this may be ignored - * depending on the context in which this function is being called. - */ -int fr_connection_pool_reconnect(fr_connection_pool_t *pool) -{ - uint32_t i; - fr_connection_t *this; - time_t now; + fr_connection_pool_t *pool; + char const *cs_name1, *cs_name2; - /* - * Mark all connections in the pool as requiring - * reconnection. - */ - pthread_mutex_lock(&pool->mutex); - for (this = pool->head; this; this = this->next) this->needs_reconnecting = true; + int ret; - /* - * We want to ensure at least 'start' connections - * have been reconnected. We can't call reconnect - * because, we might get the same connection each - * time we reserve one, so we close 'start' - * connections, and then attempt to spawn them again. - */ - for (i = 0; i < pool->start; i++) { - this = fr_heap_peek(pool->heap); - if (!this) break; /* There wasn't 'start' connections available */ +#define CONNECTION_POOL_CF_KEY "connection_pool" +#define parent_name(_x) cf_section_name(cf_item_parent(cf_section_to_item(_x))) - fr_connection_close(pool, this); - } - pthread_mutex_unlock(&pool->mutex); + cs_name1 = cf_section_name1(module); + cs_name2 = cf_section_name2(module); + if (!cs_name2) cs_name2 = cs_name1; - now = time(NULL); + snprintf(trigger_prefix, sizeof(trigger_prefix), "modules.%s.", cs_name1); - /* - * Now attempt to spawn 'start' connections. - */ - for (i = 0; i < pool->start; i++) { - this = fr_connection_spawn(pool, now, false); - if (!this) return -1; + if (!log_prefix) { + snprintf(buff, sizeof(buff), "rlm_%s (%s)", cs_name1, cs_name2); + log_prefix = buff; } - return 0; -} - - -/** Check whether a connection needs to be removed from the pool - * - * Will verify that the connection is within idle_timeout, max_uses, and - * lifetime values. If it is not, the connection will be closed. - * - * @note Will only close connections not in use. - * @note Must be called with the mutex held. - * - * @param[in,out] pool to modify. - * @param[in,out] this Connection to manage. - * @param[in] now Current time. - * @return - * - 0 if connection was closed. - * - 1 if connection handle was left open. - */ -static int fr_connection_manage(fr_connection_pool_t *pool, - fr_connection_t *this, - time_t now) -{ - rad_assert(pool != NULL); - rad_assert(this != NULL); - /* - * Don't terminated in-use connections + * Get sibling's pool config section */ - if (this->in_use) return 1; - - if ((pool->max_uses > 0) && - (this->num_uses >= pool->max_uses)) { - DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit max_uses limit", pool->log_prefix, - this->number); - do_delete: - if (pool->num <= pool->min) { - RATE_LIMIT(WARN("%s: You probably need to lower \"min\"", pool->log_prefix)); - } - fr_connection_close(pool, this); - return 0; - } - - if ((pool->lifetime > 0) && - ((this->created + pool->lifetime) < now)) { - DEBUG("%s: Closing expired connection (%" PRIu64 "): Hit lifetime limit", - pool->log_prefix, this->number); - goto do_delete; - } - - if ((pool->idle_timeout > 0) && - ((this->last_released.tv_sec + pool->idle_timeout) < now)) { - INFO("%s: Closing connection (%" PRIu64 "): Hit idle_timeout, was idle for %u seconds", - pool->log_prefix, this->number, (int) (now - this->last_released.tv_sec)); - goto do_delete; - } - - return 1; -} - - -/** Check whether any connections need to be removed from the pool - * - * Maintains the number of connections in the pool as per the configuration - * parameters for the connection pool. - * - * @note Will only run checks the first time it's called in a given second, - * to throttle connection spawning/closing. - * @note Will only close connections not in use. - * @note Must be called with the mutex held, will release mutex before - * returning. - * - * @param[in,out] pool to manage. - * @return 1 - */ -static int fr_connection_pool_check(fr_connection_pool_t *pool) -{ - uint32_t spawn, idle, extra; - time_t now = time(NULL); - fr_connection_t *this, *next; + ret = find_module_sibling_section(&cs, module, "pool"); + switch (ret) { + case -1: + return NULL; - if (pool->last_checked == now) { - pthread_mutex_unlock(&pool->mutex); - return 1; - } + case 1: + DEBUG4("%s: Using pool section from \"%s\"", log_prefix, parent_name(cs)); + break; - /* - * Some idle connections are OK, if they're within the - * configured "spare" range. Any extra connections - * outside of that range can be closed. - */ - idle = pool->num - pool->active; - if (idle <= pool->spare) { - extra = 0; - } else { - extra = idle - pool->spare; + case 0: + DEBUG4("%s: Using local pool section", log_prefix); + break; } /* - * The other end can close connections. If so, we'll - * have fewer than "min". When that happens, open more - * connections to enforce "min". - */ - if ((pool->num + pool->pending) <= pool->min) { - spawn = pool->min - (pool->num + pool->pending); - extra = 0; - - /* - * If we're about to create more than "max", - * don't create more. - */ - } else if ((pool->num + pool->pending) >= pool->max) { - /* - * Ensure we don't spawn more connections. If - * there are extra idle connections, we can - * delete all of them. - */ - spawn = 0; - /* leave extra alone from above */ - - /* - * min < num < max - * - * AND we don't have enough idle connections. - * Open some more. - */ - } else if (idle <= pool->spare) { - /* - * Not enough spare connections. Spawn a few. - * But cap the pool size at "max" - */ - spawn = pool->spare - idle; - extra = 0; - - if ((pool->num + pool->pending + spawn) > pool->max) { - spawn = pool->max - (pool->num + pool->pending); - } - - /* - * min < num < max - * - * We have more than enough idle connections, AND - * some are pending. Don't open or close any. - */ - } else if (pool->pending) { - spawn = 0; - extra = 0; - - /* - * We have too many idle connections, but closing - * some would take us below "min", so we only - * close enough to take us to "min". + * Get our pool config section */ - } else if ((pool->min + extra) >= pool->num) { - spawn = 0; - extra = pool->num - pool->min; - - } else { - /* - * Closing the "extra" connections won't take us - * below "min". It's therefore safe to close - * them all. - */ - spawn = 0; - /* leave extra alone from above */ - } + mycs = cf_section_sub_find(module, "pool"); + if (!mycs) { + DEBUG4("%s: Adding pool section to config item \"%s\" to store pool references", log_prefix, + cf_section_name(module)); - /* - * Only try to open spares if we're not already attempting to open - * a connection. Avoids spurious log messages. - */ - if (spawn) { - INFO("%s: %i of %u connections in use. Need more spares", pool->log_prefix, pool->active, pool->num); - pthread_mutex_unlock(&pool->mutex); - fr_connection_spawn(pool, now, false); /* ignore return code */ - pthread_mutex_lock(&pool->mutex); + mycs = cf_section_alloc(module, "pool", NULL); + cf_section_add(module, mycs); } /* - * We haven't spawned connections in a while, and there - * are too many spare ones. Close the one which has been - * unused for the longest. - */ - if (extra && (now >= (pool->last_spawned + pool->delay_interval))) { - fr_connection_t *found; - - found = NULL; - for (this = pool->tail; this != NULL; this = this->prev) { - if (this->in_use) continue; - - if (!found || - timercmp(&this->last_reserved, &found->last_reserved, <)) { - found = this; - } - } - - rad_assert(found != NULL); - - INFO("%s: Closing connection (%" PRIu64 "), from %d unused connections", pool->log_prefix, - found->number, extra); - fr_connection_close(pool, found); - - /* - * Decrease the delay for the next time we clean - * up. - */ - pool->next_delay >>= 1; - if (pool->next_delay == 0) pool->next_delay = 1; - pool->delay_interval += pool->next_delay; + * Sibling didn't have a pool config section + * Use our own local pool. + */ + if (!cs) { + DEBUG4("%s: \"%s.pool\" section not found, using \"%s.pool\"", log_prefix, + parent_name(cs), parent_name(mycs)); + cs = mycs; } /* - * Pass over all of the connections in the pool, limiting - * lifetime, idle time, max requests, etc. + * If fr_connection_pool_init has already been called + * for this config section, reuse the previous instance. + * + * This allows modules to pass in the config sections + * they would like to use the connection pool from. */ - for (this = pool->head; this != NULL; this = next) { - next = this->next; - fr_connection_manage(pool, this, now); + pool = cf_data_find(cs, CONNECTION_POOL_CF_KEY); + if (!pool) { + DEBUG4("%s: No pool reference found for config item \"%s.pool\"", log_prefix, parent_name(cs)); + pool = fr_connection_pool_init(cs, cs, opaque, c, a, log_prefix, trigger_prefix); + if (!pool) return NULL; + + DEBUG4("%s: Adding pool reference %p to config item \"%s.pool\"", log_prefix, pool, parent_name(cs)); + cf_data_add(cs, CONNECTION_POOL_CF_KEY, pool, NULL); + return pool; } + pool->ref++; - pool->last_checked = now; - pthread_mutex_unlock(&pool->mutex); + DEBUG4("%s: Found pool reference %p in config item \"%s.pool\"", log_prefix, pool, parent_name(cs)); - return 1; + /* + * We're reusing pool data add it to our local config + * section. This allows other modules to transitively + * re-use a pool through this module. + */ + if (mycs != cs) { + DEBUG4("%s: Copying pool reference %p from config item \"%s.pool\" to config item \"%s.pool\"", + log_prefix, pool, parent_name(cs), parent_name(mycs)); + cf_data_add(mycs, CONNECTION_POOL_CF_KEY, pool, NULL); + } + + return pool; } -/** Get a connection from the connection pool +/** Allocate a new pool using an existing one as a template * - * @param[in,out] pool to reserve the connection from. - * @param[in] spawn whether to spawn a new connection + * @param ctx to allocate new pool in. + * @param pool to copy. + * @param opaque data to pass to connection function. * @return - * - A pointer to the connection handle. + * - New connection pool. * - NULL on error. */ -static void *fr_connection_get_internal(fr_connection_pool_t *pool, bool spawn) +fr_connection_pool_t *fr_connection_pool_copy(TALLOC_CTX *ctx, fr_connection_pool_t *pool, void *opaque) { - time_t now; - fr_connection_t *this; + return fr_connection_pool_init(ctx, pool->cs, opaque, pool->create, + pool->alive, pool->log_prefix, pool->trigger_prefix); +} - if (!pool) return NULL; +/** Get the number of connections currently in the pool + * + * @param pool to count connections for. + * @return the number of connections in the pool + */ +int fr_connection_pool_get_num(fr_connection_pool_t *pool) +{ + return pool->num; +} - pthread_mutex_lock(&pool->mutex); +/** Get the opaque data associated with a pool + * + * @param pool to retrieve opaque data for. + * @return the opaque data for the pool. + */ +void *fr_connection_pool_get_opaque(fr_connection_pool_t *pool) +{ + return pool->opaque; +} - now = time(NULL); +/** Mark connections for reconnection, and spawn at least 'start' connections + * + * This intended to be called on a connection pool that's in use, to have it reflect + * a configuration change, or because the administrator knows that all connections + * in the pool are inviable and need to be reconnected. + * + * @param[in] pool to reconnect. + * @return + * - 0 On success. + * - -1 If we couldn't create start connections, this may be ignored + * depending on the context in which this function is being called. + */ +int fr_connection_pool_reconnect(fr_connection_pool_t *pool) +{ + uint32_t i; + fr_connection_t *this; + time_t now; /* - * Grab the link with the lowest latency, and check it - * for limits. If "connection manage" says the link is - * no longer usable, go grab another one. + * Mark all connections in the pool as requiring + * reconnection. */ - do { - this = fr_heap_peek(pool->heap); - if (!this) break; - } while (!fr_connection_manage(pool, this, now)); + pthread_mutex_lock(&pool->mutex); + for (this = pool->head; this; this = this->next) this->needs_reconnecting = true; /* - * We have a working connection. Extract it from the - * heap and use it. + * We want to ensure at least 'start' connections + * have been reconnected. We can't call reconnect + * because, we might get the same connection each + * time we reserve one, so we close 'start' + * connections, and then attempt to spawn them again. */ - if (this) { - /* - * Unless it needs reconnecting, in which - * case attempt to reconnect it. - */ - if (this->needs_reconnecting && !(this = fr_connection_reconnect_internal(pool, this))) { - pthread_mutex_unlock(&pool->mutex); - - ERROR("%s: Connection was marked for reconnection, but re-establishing connection failed", - pool->log_prefix); + for (i = 0; i < pool->start; i++) { + this = fr_heap_peek(pool->heap); + if (!this) break; /* There wasn't 'start' connections available */ - return NULL; - } - fr_heap_extract(pool->heap, this); - goto do_return; + fr_connection_close_internal(pool, this); } + pthread_mutex_unlock(&pool->mutex); + + now = time(NULL); /* - * We don't have a connection. Try to open a new one. + * Now attempt to spawn 'start' connections. */ - rad_assert(pool->active == pool->num); - - if (pool->num == pool->max) { - bool complain = false; + for (i = 0; i < pool->start; i++) { + this = fr_connection_spawn(pool, now, false); + if (!this) return -1; + } - /* - * Rate-limit complaints. - */ - if (pool->last_at_max != now) { - complain = true; - pool->last_at_max = now; - } + return 0; +} - pthread_mutex_unlock(&pool->mutex); +/** Delete a connection pool + * + * Closes, unlinks and frees all connections in the connection pool, then frees + * all memory used by the connection pool. + * + * @note Will call the 'stop' trigger. + * @note Must be called with the mutex free. + * + * @param[in,out] pool to delete. + */ +void fr_connection_pool_free(fr_connection_pool_t *pool) +{ + fr_connection_t *this; - if (!RATE_LIMIT_ENABLED || complain) { - ERROR("%s: No connections available and at max connection limit", pool->log_prefix); - } + if (!pool) return; - return NULL; + /* + * More modules hold a reference to this pool, don't free + * it yet. + */ + if (pool->ref > 0) { + pool->ref--; + return; } - pthread_mutex_unlock(&pool->mutex); - - if (!spawn) return NULL; + DEBUG("%s: Removing connection pool", pool->log_prefix); - DEBUG("%s: %i of %u connections in use. You may need to increase \"spare\"", pool->log_prefix, - pool->active, pool->num); - this = fr_connection_spawn(pool, now, true); /* MY connection! */ - if (!this) return NULL; pthread_mutex_lock(&pool->mutex); -do_return: - pool->active++; - this->num_uses++; - gettimeofday(&this->last_reserved, NULL); - this->in_use = true; + /* + * Don't loop over the list. Just keep removing the head + * until they're all gone. + */ + while ((this = pool->head) != NULL) { + INFO("%s: Closing connection (%" PRIu64 ")", pool->log_prefix, this->number); -#ifdef PTHREAD_DEBUG - this->pthread_id = pthread_self(); -#endif - pthread_mutex_unlock(&pool->mutex); + fr_connection_close_internal(pool, this); + } - DEBUG("%s: Reserved connection (%" PRIu64 ")", pool->log_prefix, this->number); + fr_heap_delete(pool->heap); - return this->connection; -} + fr_connection_exec_trigger(pool, "stop"); + + rad_assert(pool->head == NULL); + rad_assert(pool->tail == NULL); + rad_assert(pool->num == 0); + talloc_free(pool); +} /** Reserve a connection in the connection pool * @@ -1394,26 +1382,6 @@ void *fr_connection_get(fr_connection_pool_t *pool) return fr_connection_get_internal(pool, true); } -/** Get the number of connections currently in the pool - * - * @param pool to count connections for. - * @return the number of connections in the pool - */ -int fr_connection_get_num(fr_connection_pool_t *pool) -{ - return pool->num; -} - -/** Get the opaque data associated with a pool - * - * @param pool to retrieve opaque data for. - * @return the opaque data for the pool. - */ -void *fr_connection_get_opaque(fr_connection_pool_t *pool) -{ - return pool->opaque; -} - /** Release a connection * * Will mark a connection as unused and decrement the number of active @@ -1508,3 +1476,30 @@ void *fr_connection_reconnect(fr_connection_pool_t *pool, void *conn) return new_conn; } + +/** Delete a connection from the connection pool. + * + * Resolves the connection handle to a connection, then (if found) + * closes, unlinks and frees that connection. + * + * @note Must be called with the mutex free. + * + * @param[in,out] pool Connection pool to modify. + * @param[in] conn to delete. + * @return + * - 0 If the connection could not be found. + * - 1 if the connection was deleted. + */ +int fr_connection_close_internal(fr_connection_pool_t *pool, void *conn) +{ + fr_connection_t *this; + + this = fr_connection_find(pool, conn); + if (!this) return 0; + + INFO("%s: Deleting connection (%" PRIu64 ")", pool->log_prefix, this->number); + + fr_connection_close_internal(pool, this); + fr_connection_pool_check(pool); + return 1; +} diff --git a/src/modules/rlm_ldap/ldap.c b/src/modules/rlm_ldap/ldap.c index 8343cfaeb20..035466fc3d7 100644 --- a/src/modules/rlm_ldap/ldap.c +++ b/src/modules/rlm_ldap/ldap.c @@ -710,7 +710,7 @@ ldap_rcode_t rlm_ldap_bind(rlm_ldap_t const *inst, REQUEST *request, ldap_handle * For sanity, for when no connections are viable, * and we can't make a new one. */ - num = retry ? fr_connection_get_num(inst->pool) : 0; + num = retry ? fr_connection_pool_get_num(inst->pool) : 0; for (i = num; i >= 0; i--) { #ifdef HAVE_LDAP_SASL_INTERACTIVE_BIND if (sasl && sasl->mech) { @@ -870,7 +870,7 @@ ldap_rcode_t rlm_ldap_search(LDAPMessage **result, rlm_ldap_t const *inst, REQUE * For sanity, for when no connections are viable, * and we can't make a new one. */ - for (i = fr_connection_get_num(inst->pool); i >= 0; i--) { + for (i = fr_connection_pool_get_num(inst->pool); i >= 0; i--) { (void) ldap_search_ext((*pconn)->handle, dn, scope, filter, search_attrs, 0, serverctrls, clientctrls, &tv, 0, &msgid); @@ -997,7 +997,7 @@ ldap_rcode_t rlm_ldap_modify(rlm_ldap_t const *inst, REQUEST *request, ldap_hand * For sanity, for when no connections are viable, * and we can't make a new one. */ - for (i = fr_connection_get_num(inst->pool); i >= 0; i--) { + for (i = fr_connection_pool_get_num(inst->pool); i >= 0; i--) { RDEBUG2("Modifying object with DN \"%s\"", dn); (void) ldap_modify_ext((*pconn)->handle, dn, mods, NULL, NULL, &msgid); @@ -1577,7 +1577,7 @@ void mod_conn_release(rlm_ldap_t const *inst, ldap_handle_t *conn) * Instead, we let the next caller do the rebind. */ if (conn->referred) { - fr_connection_del(inst->pool, conn); + fr_connection_close_internal(inst->pool, conn); return; } diff --git a/src/modules/rlm_sql/sql.c b/src/modules/rlm_sql/sql.c index 2e79ab3a073..25c6c48ff4b 100644 --- a/src/modules/rlm_sql/sql.c +++ b/src/modules/rlm_sql/sql.c @@ -346,7 +346,7 @@ sql_rcode_t rlm_sql_query(rlm_sql_t *inst, REQUEST *request, rlm_sql_handle_t ** /* * inst->pool may be NULL is this function is called by mod_conn_create. */ - count = inst->pool ? fr_connection_get_num(inst->pool) : 0; + count = inst->pool ? fr_connection_pool_get_num(inst->pool) : 0; /* * Here we try with each of the existing connections, then try to create @@ -446,7 +446,7 @@ sql_rcode_t rlm_sql_select_query(rlm_sql_t *inst, REQUEST *request, rlm_sql_hand /* * inst->pool may be NULL is this function is called by mod_conn_create. */ - count = inst->pool ? fr_connection_get_num(inst->pool) : 0; + count = inst->pool ? fr_connection_pool_get_num(inst->pool) : 0; /* * For sanity, for when no connections are viable, and we can't make a new one