From: Nick Porter Date: Mon, 20 Sep 2021 12:58:44 +0000 (+0100) Subject: v4: Add a status to trunks and watchers on state changes (#4224) X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d518eef09e7fa1967108b2bb89d6fc9f9a4667ed;p=thirdparty%2Ffreeradius-server.git v4: Add a status to trunks and watchers on state changes (#4224) * Define states of a trunk * Add lookup table for names of trunk states * Update trunk state on each call of the management timer * Define structure and function type for trunk watchers * Add lists of watchers to trunks * Add functions to add / remove trunk state change watchers * Call trunk state change watchers --- diff --git a/src/lib/server/trunk.c b/src/lib/server/trunk.c index 1c1a533be7..8483a6ecd9 100644 --- a/src/lib/server/trunk.c +++ b/src/lib/server/trunk.c @@ -171,6 +171,18 @@ struct fr_trunk_connection_s { /** @} */ }; +/** An entry in a trunk watch function list + * + */ +typedef struct fr_trunk_watch_entry_s { + fr_dlist_t entry; //!< List entry. + fr_trunk_watch_t func; //!< Function to call when a trunk enters + ///< the state this list belongs to + bool oneshot; //!< Remove the function after it's called once. + bool enabled; //!< Whether the watch entry is enabled. + void *uctx; //!< User data to pass to the function. +} fr_trunk_watch_entry_t; + /** Main trunk management handle * */ @@ -240,6 +252,10 @@ struct fr_trunk_s { void *in_handler; //!< Which handler we're inside. void *uctx; //!< Uctx data to pass to alloc. + + fr_dlist_head_t watch[FR_TRUNK_STATE_MAX]; //!< To be called when trunk changes state. + + fr_trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers. /** @} */ /** @name Timers @@ -361,6 +377,13 @@ static fr_table_num_indexed_bit_pos_t const fr_trunk_conn_trigger_names[] = { }; static size_t fr_trunk_conn_trigger_names_len = NUM_ELEMENTS(fr_trunk_conn_trigger_names); +static fr_table_num_ordered_t const fr_trunk_states[] = { + { L("IDLE"), FR_TRUNK_STATE_IDLE }, + { L("ACTIVE"), FR_TRUNK_STATE_ACTIVE }, + { L("PENDING"), FR_TRUNK_STATE_PENDING } +}; +static size_t fr_trunk_states_len = NUM_ELEMENTS(fr_trunk_states); + static fr_table_num_ordered_t const fr_trunk_connection_states[] = { { L("INIT"), FR_TRUNK_CONN_INIT }, { L("HALTED"), FR_TRUNK_CONN_HALTED }, @@ -738,6 +761,114 @@ do { \ fr_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \ } while (0) +/** Call a list of watch functions associated with a state + * + */ +static inline void trunk_watch_call(fr_trunk_t *trunk, fr_dlist_head_t *list, fr_trunk_state_t state) +{ + /* + * Nested watcher calls are not allowed + * and shouldn't be possible because of + * deferred signal processing. + */ + fr_assert(trunk->next_watcher == NULL); + + while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) { + fr_trunk_watch_entry_t *entry = trunk->next_watcher; + bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */ + + if (!entry->enabled) continue; + if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry); + + entry->func(trunk, trunk->pub.state, state, entry->uctx); + + if (oneshot) talloc_free(entry); + } + trunk->next_watcher = NULL; +} + +/** Call the state change watch functions + * + */ +#define CALL_WATCHERS(_trunk, _state) \ +do { \ + if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \ + trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \ +} while(0) + +/** Remove a watch function from a trunk state list + * + * @param[in] trunk The trunk to remove the watcher from. + * @param[in] state to remove the watch from. + * @param[in] watch Function to remove. + * @return + * - 0 if the function was removed successfully. + * - -1 if the function wasn't present in the watch list. + * - -2 if an invalid state was passed. + */ +int fr_trunk_del_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch) +{ + fr_trunk_watch_entry_t *entry = NULL; + fr_dlist_head_t *list; + + if (state >= FR_TRUNK_STATE_MAX) return -2; + + list = &trunk->watch[state]; + while ((entry = fr_dlist_next(list, entry))) { + if (entry->func == watch) { + if (trunk->next_watcher == entry) { + trunk->next_watcher = fr_dlist_remove(list, entry); + } else { + fr_dlist_remove(list, entry); + } + talloc_free(entry); + return 0; + } + } + + return -1; +} + +/** Add a watch entry to the trunk state list + * + * @param[in] trunk The trunk to add the watcher to. + * @param[in] state to watch for. + * @param[in] watch Function to add. + * @param[in] oneshot Should this watcher only be run once. + * @param[in] uctx Context to pass to function. + * @return + * - NULL if an invlaid state is passed. + * - A new watch entry handle on success. + */ +fr_trunk_watch_entry_t *fr_trunk_add_watch(fr_trunk_t *trunk, fr_trunk_state_t state, + fr_trunk_watch_t watch, bool oneshot, void const *uctx) +{ + fr_trunk_watch_entry_t *entry; + fr_dlist_head_t *list; + + if (state >= FR_TRUNK_STATE_MAX) return NULL; + + list = &trunk->watch[state]; + MEM(entry = talloc_zero(trunk, fr_trunk_watch_entry_t)); + + entry->func = watch; + entry->oneshot = oneshot; + entry->enabled = true; + memcpy(&entry->uctx, &uctx, sizeof(entry->uctx)); + fr_dlist_insert_tail(list, entry); + + return entry; +} + +#define TRUNK_STATE_TRANSITION(_new) \ +do { \ + DEBUG3("Trunk changed state %s -> %s", \ + fr_table_str_by_value(fr_trunk_states, trunk->pub.state, ""), \ + fr_table_str_by_value(fr_trunk_states, _new, "")); \ + CALL_WATCHERS(trunk, _new); \ + trunk->pub.state = _new; \ +} while (0); + static void trunk_request_enter_backlog(fr_trunk_request_t *treq, bool new); static void trunk_request_enter_pending(fr_trunk_request_t *treq, fr_trunk_connection_t *tconn, bool new); static void trunk_request_enter_partial(fr_trunk_request_t *treq); @@ -3850,6 +3981,7 @@ static void trunk_manage(fr_trunk_t *trunk, fr_time_t now) uint32_t average = 0; uint32_t req_count; uint16_t conn_count; + fr_trunk_state_t new_state; DEBUG3("Managing trunk"); @@ -3876,6 +4008,24 @@ static void trunk_manage(fr_trunk_t *trunk, fr_time_t now) while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn)); } + /* + * Update the state of the trunk + */ + if (fr_trunk_connection_count_by_state(trunk, FR_TRUNK_CONN_ACTIVE)) { + new_state = FR_TRUNK_STATE_ACTIVE; + } else { + /* + * INIT / CONNECTING / FULL mean connections will become active + * so the trunk is PENDING + */ + new_state = fr_trunk_connection_count_by_state(trunk, FR_TRUNK_CONN_INIT | + FR_TRUNK_CONN_CONNECTING | + FR_TRUNK_CONN_FULL) ? + FR_TRUNK_STATE_PENDING : FR_TRUNK_STATE_IDLE; + } + + if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state); + /* * A trunk can be signalled to not proactively * manage connections if a destination is known @@ -4471,6 +4621,8 @@ static int _trunk_free(fr_trunk_t *trunk) { fr_trunk_connection_t *tconn; fr_trunk_request_t *treq; + fr_trunk_watch_entry_t *watch; + size_t i; DEBUG4("Trunk free %p", trunk); @@ -4513,6 +4665,13 @@ static int _trunk_free(fr_trunk_t *trunk) */ while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq); + /* + * Free any entries in the watch lists + */ + for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) { + while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch); + } + return 0; } @@ -4547,6 +4706,7 @@ fr_trunk_t *fr_trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *log_prefix, void const *uctx, bool delay_start) { fr_trunk_t *trunk; + size_t i; /* * Check we have the functions we need @@ -4594,6 +4754,13 @@ fr_trunk_t *fr_trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, fr_dlist_talloc_init(&trunk->draining_to_free, fr_trunk_connection_t, entry); fr_dlist_talloc_init(&trunk->to_free, fr_trunk_connection_t, entry); + /* + * Watch lists + */ + for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) { + fr_dlist_talloc_init(&trunk->watch[i], fr_trunk_watch_entry_t, entry); + } + DEBUG4("Trunk allocated %p", trunk); if (!delay_start) { diff --git a/src/lib/server/trunk.h b/src/lib/server/trunk.h index 879e319260..ddf117197f 100644 --- a/src/lib/server/trunk.h +++ b/src/lib/server/trunk.h @@ -59,6 +59,13 @@ typedef enum { FR_TRUNK_CANCEL_REASON_REQUEUE //!< A previously sent request is being requeued. } fr_trunk_cancel_reason_t; +typedef enum { + FR_TRUNK_STATE_IDLE = 0, //!< Trunk has no connections + FR_TRUNK_STATE_ACTIVE, //!< Trunk has active connections + FR_TRUNK_STATE_PENDING, //!< Trunk has connections, but none are active + FR_TRUNK_STATE_MAX +} fr_trunk_state_t; + /** What type of I/O events the trunk connection is currently interested in receiving * */ @@ -292,6 +299,8 @@ struct fr_trunk_pub_s { /** @} */ bool _CONST triggers; //!< do we run the triggers? + + fr_trunk_state_t _CONST state; //!< Current state of the trunk. }; /** Public fields for the trunk request @@ -665,6 +674,18 @@ typedef void (*fr_trunk_request_fail_t)(request_t *request, void *preq, void *rc */ typedef void (*fr_trunk_request_free_t)(request_t *request, void *preq_to_free, void *uctx); +/** Receive a notification when a trunk enters a particular state + * + * @param[in] trunk Being watched. + * @param[in] prev State we came from. + * @param[in] state State that was entered (the current state) + * @param[in] uctx that was passed to fr_trunk_add_watch_*. + */ +typedef void(*fr_trunk_watch_t)(fr_trunk_t *trunk, + fr_trunk_state_t prev, fr_trunk_state_t state, void *uctx); + +typedef struct fr_trunk_watch_entry_s fr_trunk_watch_entry_t; + /** I/O functions to pass to fr_trunk_alloc * */ @@ -841,6 +862,15 @@ fr_trunk_t *fr_trunk_alloc(TALLOC_CTX *ctx, fr_event_list_t *el, char const *log_prefix, void const *uctx, bool delay_start) CC_HINT(nonnull(2, 3, 4)); /** @} */ +/** @name Watchers + * @{ + */ +fr_trunk_watch_entry_t *fr_trunk_add_watch(fr_trunk_t *trunk, fr_trunk_state_t state, + fr_trunk_watch_t watch, bool oneshot, void const *uctx) CC_HINT(nonnull(1)); + +int fr_trunk_del_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch); +/** @} */ + #undef _CONST #ifdef __cplusplus