struct fr_bio_retry_s {
FR_BIO_COMMON;
- fr_rb_tree_t next_retry_tree; //!< when packets are retried next
- fr_rb_tree_t expiry_tree; //!< when packets expire, so that we expire packets when the socket is blocked.
+ fr_timer_list_t *next_tl; //!< when packets are retried next
+ fr_timer_list_t *expiry_tl; //!< when packets expire, so that we expire packets when the socket is blocked.
fr_bio_retry_info_t info;
ssize_t error;
bool all_used; //!< blocked due to no free entries
- fr_timer_t *ev; //!< we only need one timer event: next time we do something
-
- /*
- * The first item is cached here so that we can detect when it changes. The insert / delete
- * code can just do its work without worrying about timers. And then when the tree manipulation
- * is done, call the fr_bio_retry_timer_reset() function to reset (or not) the timer.
- */
- fr_bio_retry_entry_t *next_retry_item; //!< for timers
-
/*
* Cache a partial write when IO is blocked. Partial
* packets are left in the timer tree so that they can be expired.
FR_DLIST_HEAD(fr_bio_retry_list) free; //!< free lists are better than memory fragmentation
};
-static void fr_bio_retry_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx);
-static void fr_bio_retry_expiry_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx);
static ssize_t fr_bio_retry_write(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
static ssize_t fr_bio_retry_save_write(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, ssize_t rcode);
-#define fr_bio_retry_timer_clear(_x) do { \
- talloc_const_free((_x)->ev); \
- (_x)->next_retry_item = NULL; \
- } while (0)
-
-/** Reset the expiry timer after expiring one element
- *
- */
-static int fr_bio_retry_expiry_timer_reset(fr_bio_retry_t *my)
-{
- fr_bio_retry_entry_t *first;
-
- fr_assert(my->info.write_blocked);
-
- /*
- * Nothing to do, don't set any timers.
- */
- first = fr_rb_first(&my->expiry_tree);
- if (!first) {
- fr_bio_retry_timer_clear(my);
- return 0;
- }
-
- /*
- * The timer is already set correctly, we're done.
- */
- if (first == my->next_retry_item) return 0;
-
- /*
- * Update the timer. This should never fail.
- */
- if (fr_timer_at(my, my->info.el->tl, &my->ev, first->retry.end, false, fr_bio_retry_expiry_timer, my) < 0) return -1;
-
- my->next_retry_item = first;
- return 0;
-}
-
-
-/** Reset the timer after changing the rb tree.
- *
- */
-static int fr_bio_retry_timer_reset(fr_bio_retry_t *my)
-{
- fr_bio_retry_entry_t *first;
-
- if (my->info.write_blocked) return fr_bio_retry_expiry_timer_reset(my);
-
- /*
- * Nothing to do, don't set any timers.
- */
- first = fr_rb_first(&my->next_retry_tree);
- if (!first) {
- cancel_timer:
- fr_bio_retry_timer_clear(my);
- return 0;
- }
-
- /*
- * We're partially writing a response. Don't bother with the timer, and delete any existing
- * timer. It will be reset when the partial entry is placed back into the queue.
- */
- if (first == my->partial) goto cancel_timer;
-
- /*
- * The timer is already set correctly, we're done.
- */
- if (first == my->next_retry_item) return 0;
-
- /*
- * Update the timer. This should never fail.
- */
- if (fr_timer_at(my, my->info.el->tl, &my->ev, first->retry.next, false, fr_bio_retry_timer, my) < 0) return -1;
-
- my->next_retry_item = first;
- return 0;
-}
-
/** Release an entry back to the free list.
*
*/
static void fr_bio_retry_release(fr_bio_retry_t *my, fr_bio_retry_entry_t *item, fr_bio_retry_release_reason_t reason)
{
- bool timer_reset = false;
+ item->cancelled = true;
/*
- * Remove the item before calling the application "release" function.
+ * Remove the item from all timer lists before calling the application "release" function.
+ *
+ * reserved items (e.g. application-layer watchdogs like Status-Server) are run by the
+ * application, and aren't inserted into any tree.
*/
- if (my->partial != item) {
- if (!item->reserved) {
- (void) fr_rb_remove_by_inline_node(&my->next_retry_tree, &item->next_retry_node);
- (void) fr_rb_remove_by_inline_node(&my->expiry_tree, &item->expiry_node);
- }
- } else {
- item->cancelled = true;
+ if (!item->reserved) {
+ (void) fr_timer_uctx_remove(my->next_tl, item);
+ (void) fr_timer_uctx_remove(my->expiry_tl, item);
}
/*
if (my->partial == item) return;
/*
- * We're deleting the timer entry, make sure that we clean up its events,
+ * This item is reserved. The application has cached a pointer to it, so it never gets returned
+ * to the free list.
*/
- if (my->next_retry_item == item) {
- fr_bio_retry_timer_clear(my);
- timer_reset = true;
- }
+ if (item->reserved) return;
/*
- * If we were blocked due to having no free entries, then resume writes as soon as we create a free entry.
+ * If we were blocked due to having no free entries, then we can resume writes, since we now have
+ * a free entry.
*/
if (my->all_used) {
fr_assert(fr_bio_retry_list_num_elements(&my->free) == 0);
/*
* The application MUST call fr_bio_retry_write_resume(), which will check if IO is
* actually blocked.
+ *
+ * @todo - make this function return a failure, OR update the ctx with a failure? OR
+ * call a bio error function on failure? That way we can just call write_resume() from here.
*/
my->all_used = false;
- if (my->cb.write_resume) (void) my->cb.write_resume(&my->bio);
+ if (!my->info.write_blocked && my->cb.write_resume) (void) my->cb.write_resume(&my->bio);
}
- /*
- * If write_resume() above called the application, then it might have already updated the timer.
- * Don't do that again.
- */
- if (timer_reset) (void) fr_bio_retry_timer_reset(my);
-
item->packet_ctx = NULL;
- fr_assert(my->next_retry_item != item);
fr_bio_retry_list_insert_head(&my->free, item);
}
fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
if (my->info.write_blocked) {
- fr_assert(!my->ev);
return 1;
}
- my->info.write_blocked = true;
+ /*
+ * Disarm the retry timer, and enable the expiry timer.
+ *
+ * i.e. we won't retry packets, but we will expire them when their timer runs out.
+ */
+ if (fr_timer_list_disarm(my->next_tl) < 0) return fr_bio_error(GENERIC);
+
+ if (fr_timer_list_arm(my->expiry_tl) < 0) return fr_bio_error(GENERIC);
- fr_bio_retry_timer_clear(my);
- if (fr_bio_retry_expiry_timer_reset(my) < 0) return fr_bio_error(GENERIC);
+ my->info.write_blocked = true;
return 1;
}
if (fr_time_lteq(my->info.first_sent, my->info.last_idle)) my->info.first_sent = now;
}
+ fr_assert(fr_time_gt(item->retry.next, now));
+
+ /*
+ * We rewrote the "next" timer. Remove the item from the timer tree, which doesn't call the cmp
+ * function and therefore doesn't care that the time has changed. Then re-insert it, which does
+ * call the cmp function.
+ */
+ (void) fr_timer_uctx_remove(my->next_tl, item);
+ (void) fr_timer_uctx_insert(my->next_tl, item);
+
/*
* Write out the packet. On failure release this item.
*
return 0;
}
- /*
- * We wrote the whole packet. Re-insert it, which is done _without_ doing calls to
- * cmp(), so we it's OK for us to rewrite item->retry.next.
- */
- (void) fr_rb_remove_by_inline_node(&my->next_retry_tree, &item->next_retry_node);
- (void) fr_rb_insert(&my->next_retry_tree, item);
-
return 1;
}
-/*
- * Check for the "next next" retry. If that's still in the past,
- * then skip it. But _don't_ update retry.count, as we don't
- * send packets. Instead, just enforce MRD, etc.
- */
-static int fr_bio_retry_write_delayed(fr_bio_retry_t *my, fr_time_t now)
-{
- fr_bio_retry_entry_t *item;
-
- /*
- * We can't be in this function if there's a partial packet. We must be in
- * fr_bio_retry_write_partial().
- */
- fr_assert(!my->partial);
- fr_assert(!my->info.write_blocked);
-
- while ((item = fr_rb_first(&my->next_retry_tree)) != NULL) {
- int rcode;
-
- /*
- * This item needs to be sent in the future, we're done.
- */
- if (fr_time_cmp(now, item->retry.next) > 0) break;
-
- /*
- * Write one item, and don't update timers.
- */
- rcode = fr_bio_retry_write_item(my, item, now);
- if (rcode <= 0) return rcode;
- }
-
- /*
- * Now that we've written multiple items, reset the timer.
- *
- * We do this at the end of the loop so that we don't update it for each item in the loop.
- *
- * @todo - set generic write error?
- */
- (void) fr_bio_retry_timer_reset(my);
-
- return 1;
-}
-
-
/** Resume writes.
*
* On resume, we try to flush any pending packets which should have been sent.
static int fr_bio_retry_write_resume(fr_bio_t *bio)
{
fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
- int rcode;
if (!my->info.write_blocked) return 1;
- rcode = fr_bio_retry_write_delayed(my, fr_time());
- if (rcode <= 0) return rcode;
-
my->info.write_blocked = false;
- fr_bio_retry_timer_clear(my);
- (void) fr_bio_retry_timer_reset(my);
+ /*
+ * Disarm the expiry list, and rearm the next retry list.
+ *
+ * Rearming the next retry list will cause all pending events to be run. Which means calling the
+ * write routine for each item. If the write ends up blocking, it will disarm the next retry
+ * timer, re-arm the expiry timer, and then set the write_blocked flag.
+ */
+ (void) fr_timer_list_disarm(my->expiry_tl);
+ (void) fr_timer_list_arm(my->next_tl);
- return 1;
+ return !my->info.write_blocked; /* return 0 for "can't resume" and 1 for "can resume" */
}
fr_bio_t *next;
fr_bio_retry_entry_t *item = my->partial;
- fr_assert(!my->next_retry_item);
- fr_assert(!my->ev);
fr_assert(my->partial != NULL);
fr_assert(my->buffer.start);
/** Run an expiry timer event.
*
*/
-static void fr_bio_retry_expiry_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx)
+static void fr_bio_retry_expiry_timer(UNUSED fr_timer_list_t *tl, UNUSED fr_time_t now, void *uctx)
{
- fr_bio_retry_t *my = talloc_get_type_abort(uctx, fr_bio_retry_t);
- fr_bio_retry_entry_t *item;
- fr_time_t expires;
+ fr_bio_retry_entry_t *item = talloc_get_type_abort(uctx, fr_bio_retry_entry_t);
+ fr_bio_retry_t *my = item->my;
/*
- * For the timer to be running, there must be a "first" entry which causes the timer to fire.
- *
- * There must also be no partially written entry. If the IO is blocked, then all timers are
- * suspended.
+ * We only expire entries if writing is blocked.
*/
- fr_assert(my->next_retry_item != NULL);
- fr_assert(!my->partial);
fr_assert(my->info.write_blocked);
/*
- * We should be expiring at least one entry, so nuke the timers.
- */
- my->next_retry_item = NULL;
-
- /*
- * Expire all entries which are within 10ms of "now". That way we don't reset the event many
- * times in short succession.
+ * An item is DONE if it received a reply, then waited for another reply, and then the socket
+ * became blocked.
*/
- expires = fr_time_add(now, fr_time_delta_from_msec(10));
-
- while ((item = fr_rb_first(&my->expiry_tree)) != NULL) {
- if (fr_time_gt(item->retry.end, expires)) break;
-
- fr_bio_retry_release(my, item, (item->retry.replies > 0) ? FR_BIO_RETRY_DONE : FR_BIO_RETRY_NO_REPLY);
- }
-
- (void) fr_bio_retry_expiry_timer_reset(my);
+ fr_bio_retry_release(my, item, (item->retry.replies > 0) ? FR_BIO_RETRY_DONE : FR_BIO_RETRY_NO_REPLY);
}
/** Run a timer event. Usually to write out another packet.
*
*/
-static void fr_bio_retry_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx)
+static void fr_bio_retry_next_timer(UNUSED fr_timer_list_t *tl, fr_time_t now, void *uctx)
{
ssize_t rcode;
- fr_bio_retry_t *my = talloc_get_type_abort(uctx, fr_bio_retry_t);
- fr_bio_retry_entry_t *item;
+ fr_bio_retry_entry_t *item = talloc_get_type_abort(uctx, fr_bio_retry_entry_t);
+ fr_bio_retry_t *my = item->my;
- /*
- * For the timer to be running, there must be a "first" entry which causes the timer to fire.
- *
- * There must also be no partially written entry. If the IO is blocked, then all timers are
- * suspended.
- */
- fr_assert(my->next_retry_item != NULL);
fr_assert(my->partial == NULL);
-
- item = my->next_retry_item;
- my->next_retry_item = NULL;
+ fr_assert(!my->info.write_blocked);
/*
* Retry one item.
my->error = rcode;
my->bio.write = fr_bio_retry_write_fatal;
return;
- }
-
- /*
- * Partial write - no timers get set. We need to wait until the descriptor is writable.
- */
- if (rcode == 0) {
- fr_assert(my->partial != NULL);
- return;
- }
-
- /*
- * We successfull wrote this item. Reset the timer to the next one, which is likely to be a
- * different one from the item we just updated.
- */
- (void) fr_bio_retry_timer_reset(my);
+ }
}
/** Write a request, and see if we have a reply.
/*
* Grab the first item which can be expired.
*/
- item = fr_rb_first(&my->expiry_tree);
+ item = fr_timer_uctx_peek(my->expiry_tl);
fr_assert(item != NULL);
/*
/*
* This should never fail.
*/
- (void) fr_rb_insert(&my->next_retry_tree, item);
- (void) fr_rb_insert(&my->expiry_tree, item);
+ (void) fr_timer_uctx_insert(my->next_tl, item);
+ (void) fr_timer_uctx_insert(my->expiry_tl, item);
/*
* We only wrote part of the packet, remember to write the rest of it.
return fr_bio_retry_save_write(my, item, rcode);
}
- /*
- * We've just inserted this packet into the timer tree, so it can't be used as the current timer.
- * Once we've inserted it, we update the timer.
- */
- fr_assert(my->next_retry_item != item);
-
- /*
- * If we can't set the timer, then release this item.
- */
- if (fr_bio_retry_timer_reset(my) < 0) {
- fr_bio_retry_release(my, item, FR_BIO_RETRY_CANCELLED);
- return fr_bio_error(GENERIC);
- }
-
return size;
}
*/
item->retry.next = fr_time_add_time_delta(item->retry.start, my->retry_config.mrd);
- (void) fr_rb_remove_by_inline_node(&my->next_retry_tree, &item->next_retry_node);
- (void) fr_rb_insert(&my->next_retry_tree, item);
- (void) fr_bio_retry_timer_reset(my);
+ (void) fr_timer_uctx_remove(my->next_tl, item);
+ (void) fr_timer_uctx_insert(my->next_tl, item);
return rcode;
}
* No item passed, try to cancel the first one to expire.
*/
if (!item) {
- item = fr_rb_first(&my->expiry_tree);
+ item = fr_timer_uctx_peek(my->expiry_tl);
if (!item) return 0;
/*
*/
static int fr_bio_retry_destructor(fr_bio_retry_t *my)
{
- fr_rb_iter_inorder_t iter;
fr_bio_retry_entry_t *item;
- fr_bio_retry_timer_clear(my);
+ fr_timer_list_disarm(my->next_tl);
+ fr_timer_list_disarm(my->expiry_tl);
/*
* Cancel all outgoing packets. Don't bother updating the tree or the free list, as all of the
* entries will be deleted when the memory is freed.
*/
- while ((item = fr_rb_iter_init_inorder(&iter, &my->next_retry_tree)) != NULL) {
- fr_rb_iter_delete_inorder(&iter);
+ while ((item = fr_timer_uctx_peek(my->next_tl)) != NULL) {
+ (void) fr_timer_uctx_remove(my->next_tl, item);
my->release((fr_bio_t *) my, item, FR_BIO_RETRY_CANCELLED);
}
fr_bio_retry_list_insert_tail(&my->free, &items[i]);
}
- (void) fr_rb_inline_init(&my->next_retry_tree, fr_bio_retry_entry_t, next_retry_node, _next_retry_cmp, NULL);
- (void) fr_rb_inline_init(&my->expiry_tree, fr_bio_retry_entry_t, expiry_node, _expiry_cmp, NULL);
+ my->next_tl = fr_timer_list_shared_alloc(my, cfg->el->tl, _next_retry_cmp, fr_bio_retry_next_timer,
+ offsetof(fr_bio_retry_entry_t, next_retry_node),
+ offsetof(fr_bio_retry_entry_t, retry.next));
+ if (!my->next_tl) {
+ talloc_free(my);
+ return NULL;
+ }
+
+ my->expiry_tl = fr_timer_list_shared_alloc(my, cfg->el->tl, _expiry_cmp, fr_bio_retry_expiry_timer,
+ offsetof(fr_bio_retry_entry_t, expiry_node),
+ offsetof(fr_bio_retry_entry_t, retry.end));
+ if (!my->expiry_tl) {
+ talloc_free(my);
+ return NULL;
+ }
+
+ /*
+ * The expiry list is run only when writes are blocked. We cannot have both lists active at the
+ * same time.
+ */
+ (void) fr_timer_list_disarm(my->expiry_tl);
my->sent = sent;
if (!rewrite) {
fr_bio_retry_t *my = talloc_get_type_abort(bio, fr_bio_retry_t);
size_t num;
- num = fr_rb_num_elements(&my->next_retry_tree);
+ num = fr_timer_list_num_events(my->next_tl);
if (!my->partial) return num;