/** @} */
};
+/** An entry in a trunk watch function list
+ *
+ */
+typedef struct fr_trunk_watch_entry_s {
+ fr_dlist_t entry; //!< List entry.
+ fr_trunk_watch_t func; //!< Function to call when a trunk enters
+ ///< the state this list belongs to
+ bool oneshot; //!< Remove the function after it's called once.
+ bool enabled; //!< Whether the watch entry is enabled.
+ void *uctx; //!< User data to pass to the function.
+} fr_trunk_watch_entry_t;
+
/** Main trunk management handle
*
*/
void *in_handler; //!< Which handler we're inside.
void *uctx; //!< Uctx data to pass to alloc.
+
+ fr_dlist_head_t watch[FR_TRUNK_STATE_MAX]; //!< To be called when trunk changes state.
+
+ fr_trunk_watch_entry_t *next_watcher; //!< Watcher about to be run. Used to prevent nested watchers.
/** @} */
/** @name Timers
};
static size_t fr_trunk_conn_trigger_names_len = NUM_ELEMENTS(fr_trunk_conn_trigger_names);
+static fr_table_num_ordered_t const fr_trunk_states[] = {
+ { L("IDLE"), FR_TRUNK_STATE_IDLE },
+ { L("ACTIVE"), FR_TRUNK_STATE_ACTIVE },
+ { L("PENDING"), FR_TRUNK_STATE_PENDING }
+};
+static size_t fr_trunk_states_len = NUM_ELEMENTS(fr_trunk_states);
+
static fr_table_num_ordered_t const fr_trunk_connection_states[] = {
{ L("INIT"), FR_TRUNK_CONN_INIT },
{ L("HALTED"), FR_TRUNK_CONN_HALTED },
fr_heap_insert((_tconn)->pub.trunk->active, (_tconn)); \
} while (0)
+/** Call a list of watch functions associated with a state
+ *
+ */
+static inline void trunk_watch_call(fr_trunk_t *trunk, fr_dlist_head_t *list, fr_trunk_state_t state)
+{
+ /*
+ * Nested watcher calls are not allowed
+ * and shouldn't be possible because of
+ * deferred signal processing.
+ */
+ fr_assert(trunk->next_watcher == NULL);
+
+ while ((trunk->next_watcher = fr_dlist_next(list, trunk->next_watcher))) {
+ fr_trunk_watch_entry_t *entry = trunk->next_watcher;
+ bool oneshot = entry->oneshot; /* Watcher could be freed, so store now */
+
+ if (!entry->enabled) continue;
+ if (oneshot) trunk->next_watcher = fr_dlist_remove(list, entry);
+
+ entry->func(trunk, trunk->pub.state, state, entry->uctx);
+
+ if (oneshot) talloc_free(entry);
+ }
+ trunk->next_watcher = NULL;
+}
+
+/** Call the state change watch functions
+ *
+ */
+#define CALL_WATCHERS(_trunk, _state) \
+do { \
+ if (fr_dlist_empty(&(_trunk)->watch[_state])) break; \
+ trunk_watch_call((_trunk), &(_trunk)->watch[_state], _state); \
+} while(0)
+
+/** Remove a watch function from a trunk state list
+ *
+ * @param[in] trunk The trunk to remove the watcher from.
+ * @param[in] state to remove the watch from.
+ * @param[in] watch Function to remove.
+ * @return
+ * - 0 if the function was removed successfully.
+ * - -1 if the function wasn't present in the watch list.
+ * - -2 if an invalid state was passed.
+ */
+int fr_trunk_del_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch)
+{
+ fr_trunk_watch_entry_t *entry = NULL;
+ fr_dlist_head_t *list;
+
+ if (state >= FR_TRUNK_STATE_MAX) return -2;
+
+ list = &trunk->watch[state];
+ while ((entry = fr_dlist_next(list, entry))) {
+ if (entry->func == watch) {
+ if (trunk->next_watcher == entry) {
+ trunk->next_watcher = fr_dlist_remove(list, entry);
+ } else {
+ fr_dlist_remove(list, entry);
+ }
+ talloc_free(entry);
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
+/** Add a watch entry to the trunk state list
+ *
+ * @param[in] trunk The trunk to add the watcher to.
+ * @param[in] state to watch for.
+ * @param[in] watch Function to add.
+ * @param[in] oneshot Should this watcher only be run once.
+ * @param[in] uctx Context to pass to function.
+ * @return
+ * - NULL if an invlaid state is passed.
+ * - A new watch entry handle on success.
+ */
+fr_trunk_watch_entry_t *fr_trunk_add_watch(fr_trunk_t *trunk, fr_trunk_state_t state,
+ fr_trunk_watch_t watch, bool oneshot, void const *uctx)
+{
+ fr_trunk_watch_entry_t *entry;
+ fr_dlist_head_t *list;
+
+ if (state >= FR_TRUNK_STATE_MAX) return NULL;
+
+ list = &trunk->watch[state];
+ MEM(entry = talloc_zero(trunk, fr_trunk_watch_entry_t));
+
+ entry->func = watch;
+ entry->oneshot = oneshot;
+ entry->enabled = true;
+ memcpy(&entry->uctx, &uctx, sizeof(entry->uctx));
+ fr_dlist_insert_tail(list, entry);
+
+ return entry;
+}
+
+#define TRUNK_STATE_TRANSITION(_new) \
+do { \
+ DEBUG3("Trunk changed state %s -> %s", \
+ fr_table_str_by_value(fr_trunk_states, trunk->pub.state, "<INVALID>"), \
+ fr_table_str_by_value(fr_trunk_states, _new, "<INVALID>")); \
+ CALL_WATCHERS(trunk, _new); \
+ trunk->pub.state = _new; \
+} while (0);
+
static void trunk_request_enter_backlog(fr_trunk_request_t *treq, bool new);
static void trunk_request_enter_pending(fr_trunk_request_t *treq, fr_trunk_connection_t *tconn, bool new);
static void trunk_request_enter_partial(fr_trunk_request_t *treq);
uint32_t average = 0;
uint32_t req_count;
uint16_t conn_count;
+ fr_trunk_state_t new_state;
DEBUG3("Managing trunk");
while ((tconn = fr_dlist_head(&trunk->to_free))) talloc_free(fr_dlist_remove(&trunk->to_free, tconn));
}
+ /*
+ * Update the state of the trunk
+ */
+ if (fr_trunk_connection_count_by_state(trunk, FR_TRUNK_CONN_ACTIVE)) {
+ new_state = FR_TRUNK_STATE_ACTIVE;
+ } else {
+ /*
+ * INIT / CONNECTING / FULL mean connections will become active
+ * so the trunk is PENDING
+ */
+ new_state = fr_trunk_connection_count_by_state(trunk, FR_TRUNK_CONN_INIT |
+ FR_TRUNK_CONN_CONNECTING |
+ FR_TRUNK_CONN_FULL) ?
+ FR_TRUNK_STATE_PENDING : FR_TRUNK_STATE_IDLE;
+ }
+
+ if (new_state != trunk->pub.state) TRUNK_STATE_TRANSITION(new_state);
+
/*
* A trunk can be signalled to not proactively
* manage connections if a destination is known
{
fr_trunk_connection_t *tconn;
fr_trunk_request_t *treq;
+ fr_trunk_watch_entry_t *watch;
+ size_t i;
DEBUG4("Trunk free %p", trunk);
*/
while ((treq = fr_dlist_head(&trunk->free_requests))) talloc_free(treq);
+ /*
+ * Free any entries in the watch lists
+ */
+ for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
+ while ((watch = fr_dlist_pop_head(&trunk->watch[i]))) talloc_free(watch);
+ }
+
return 0;
}
char const *log_prefix, void const *uctx, bool delay_start)
{
fr_trunk_t *trunk;
+ size_t i;
/*
* Check we have the functions we need
fr_dlist_talloc_init(&trunk->draining_to_free, fr_trunk_connection_t, entry);
fr_dlist_talloc_init(&trunk->to_free, fr_trunk_connection_t, entry);
+ /*
+ * Watch lists
+ */
+ for (i = 0; i < NUM_ELEMENTS(trunk->watch); i++) {
+ fr_dlist_talloc_init(&trunk->watch[i], fr_trunk_watch_entry_t, entry);
+ }
+
DEBUG4("Trunk allocated %p", trunk);
if (!delay_start) {
FR_TRUNK_CANCEL_REASON_REQUEUE //!< A previously sent request is being requeued.
} fr_trunk_cancel_reason_t;
+typedef enum {
+ FR_TRUNK_STATE_IDLE = 0, //!< Trunk has no connections
+ FR_TRUNK_STATE_ACTIVE, //!< Trunk has active connections
+ FR_TRUNK_STATE_PENDING, //!< Trunk has connections, but none are active
+ FR_TRUNK_STATE_MAX
+} fr_trunk_state_t;
+
/** What type of I/O events the trunk connection is currently interested in receiving
*
*/
/** @} */
bool _CONST triggers; //!< do we run the triggers?
+
+ fr_trunk_state_t _CONST state; //!< Current state of the trunk.
};
/** Public fields for the trunk request
*/
typedef void (*fr_trunk_request_free_t)(request_t *request, void *preq_to_free, void *uctx);
+/** Receive a notification when a trunk enters a particular state
+ *
+ * @param[in] trunk Being watched.
+ * @param[in] prev State we came from.
+ * @param[in] state State that was entered (the current state)
+ * @param[in] uctx that was passed to fr_trunk_add_watch_*.
+ */
+typedef void(*fr_trunk_watch_t)(fr_trunk_t *trunk,
+ fr_trunk_state_t prev, fr_trunk_state_t state, void *uctx);
+
+typedef struct fr_trunk_watch_entry_s fr_trunk_watch_entry_t;
+
/** I/O functions to pass to fr_trunk_alloc
*
*/
char const *log_prefix, void const *uctx, bool delay_start) CC_HINT(nonnull(2, 3, 4));
/** @} */
+/** @name Watchers
+ * @{
+ */
+fr_trunk_watch_entry_t *fr_trunk_add_watch(fr_trunk_t *trunk, fr_trunk_state_t state,
+ fr_trunk_watch_t watch, bool oneshot, void const *uctx) CC_HINT(nonnull(1));
+
+int fr_trunk_del_watch(fr_trunk_t *trunk, fr_trunk_state_t state, fr_trunk_watch_t watch);
+/** @} */
+
#undef _CONST
#ifdef __cplusplus