/**
* $Id$
- * @file lib/bio/fd.c
- * @brief Binary IO abstractions for packets in buffers
+ * @file lib/bio/queue.c
+ * @brief Binary IO abstractions for queues of raw packets
*
* @copyright 2024 Network RADIUS SAS (legal@networkradius.com)
*/
#include <freeradius-devel/bio/bio_priv.h>
-#include <freeradius-devel/bio/packet.h>
+#include <freeradius-devel/bio/queue.h>
#include <freeradius-devel/bio/null.h>
#include <freeradius-devel/util/dlist.h>
-typedef struct fr_bio_packet_entry_s fr_bio_packet_entry_t;
-typedef struct fr_bio_packet_list_s fr_bio_packet_list_t;
-typedef struct fr_bio_packet_s fr_bio_packet_t;
+typedef struct fr_bio_queue_entry_s fr_bio_queue_entry_t;
+typedef struct fr_bio_queue_list_s fr_bio_queue_list_t;
+typedef struct fr_bio_queue_s fr_bio_queue_t;
/*
* Define type-safe wrappers for head and entry definitions.
*/
-FR_DLIST_TYPES(fr_bio_packet_list)
+FR_DLIST_TYPES(fr_bio_queue_list)
/*
* For delayed writes.
*
* @todo - we can remove the "cancelled" field by setting packet_ctx == my?
*/
-struct fr_bio_packet_entry_s {
+struct fr_bio_queue_entry_s {
void *packet_ctx;
void const *buffer;
size_t size;
size_t already_written;
bool cancelled;
- fr_bio_packet_t *my;
+ fr_bio_queue_t *my;
- FR_DLIST_ENTRY(fr_bio_packet_list) entry; //!< List entry.
+ FR_DLIST_ENTRY(fr_bio_queue_list) entry; //!< List entry.
};
-struct fr_bio_packet_list_s {
- FR_DLIST_HEAD(fr_bio_packet_list) saved;
- FR_DLIST_HEAD(fr_bio_packet_list) free;
+struct fr_bio_queue_list_s {
+ FR_DLIST_HEAD(fr_bio_queue_list) saved;
+ FR_DLIST_HEAD(fr_bio_queue_list) free;
};
-FR_DLIST_FUNCS(fr_bio_packet_list, fr_bio_packet_entry_t, entry)
+FR_DLIST_FUNCS(fr_bio_queue_list, fr_bio_queue_entry_t, entry)
-typedef struct fr_bio_packet_s {
+typedef struct fr_bio_queue_s {
FR_BIO_COMMON;
size_t max_saved;
- fr_bio_packet_saved_t saved;
- fr_bio_packet_callback_t sent;
- fr_bio_packet_callback_t cancel;
+ fr_bio_queue_saved_t saved;
+ fr_bio_queue_callback_t sent;
+ fr_bio_queue_callback_t cancel;
- FR_DLIST_HEAD(fr_bio_packet_list) pending;
- FR_DLIST_HEAD(fr_bio_packet_list) free;
+ FR_DLIST_HEAD(fr_bio_queue_list) pending;
+ FR_DLIST_HEAD(fr_bio_queue_list) free;
- fr_bio_packet_entry_t array[];
-} fr_bio_packet_t;
+ fr_bio_queue_entry_t array[];
+} fr_bio_queue_t;
-static ssize_t fr_bio_packet_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
+static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size);
/** Forcibly cancel all outstanding packets.
*
* shutdown(), when the destructor is called, or on fatal read / write
* errors.
*/
-static void fr_bio_packet_list_cancel(fr_bio_packet_t *my)
+static void fr_bio_queue_list_cancel(fr_bio_queue_t *my)
{
- fr_bio_packet_entry_t *item;
+ fr_bio_queue_entry_t *item;
if (!my->cancel) return;
- if (fr_bio_packet_list_num_elements(&my->pending) == 0) return;
+ if (fr_bio_queue_list_num_elements(&my->pending) == 0) return;
/*
* Cancel any remaining saved items.
*/
- while ((item = fr_bio_packet_list_pop_head(&my->pending)) != NULL) {
+ while ((item = fr_bio_queue_list_pop_head(&my->pending)) != NULL) {
my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size);
item->cancelled = true;
- fr_bio_packet_list_insert_head(&my->free, item);
+ fr_bio_queue_list_insert_head(&my->free, item);
}
}
-static int fr_bio_packet_destructor(fr_bio_packet_t *my)
+static int fr_bio_queue_destructor(fr_bio_queue_t *my)
{
fr_assert(my->cancel); /* otherwise it would be fr_bio_destructor */
my->bio.write = fr_bio_null_write;
- fr_bio_packet_list_cancel(my);
+ fr_bio_queue_list_cancel(my);
return 0;
}
/** Push a packet onto a list.
*
*/
-static ssize_t fr_bio_packet_list_push(fr_bio_packet_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
+static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written)
{
- fr_bio_packet_entry_t *item;
+ fr_bio_queue_entry_t *item;
- item = fr_bio_packet_list_pop_head(&my->free);
+ item = fr_bio_queue_list_pop_head(&my->free);
if (!item) return fr_bio_error(IO_WOULD_BLOCK);
/*
*
* Otherwise, we're a subsequent entry, and we cannot have any data which is partially written.
*/
- fr_assert((fr_bio_packet_list_num_elements(&my->pending) == 0) ||
+ fr_assert((fr_bio_queue_list_num_elements(&my->pending) == 0) ||
(already_written == 0));
item->packet_ctx = packet_ctx;
item->already_written = already_written;
item->cancelled = false;
- fr_bio_packet_list_insert_tail(&my->pending, item);
+ fr_bio_queue_list_insert_tail(&my->pending, item);
if (my->saved) my->saved(&my->bio, packet_ctx, buffer, size, item);
*
* If it blocks, save the packet and return OK to the caller.
*/
-static ssize_t fr_bio_packet_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
+static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
{
ssize_t rcode;
- fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t);
+ fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
fr_bio_t *next;
/*
* We can't call the next bio if there's still cached data to flush.
*/
- fr_assert(fr_bio_packet_list_num_elements(&my->pending) == 0);
+ fr_assert(fr_bio_queue_list_num_elements(&my->pending) == 0);
next = fr_bio_next(&my->bio);
fr_assert(next != NULL);
my->bio.read = fr_bio_eof_read;
my->bio.write = fr_bio_null_write;
- fr_bio_packet_list_cancel(my);
+ fr_bio_queue_list_cancel(my);
return rcode;
}
* The next bio wrote a partial packet. Save the entire packet, and swap the write function to
* save all future packets in the saved list.
*/
- bio->write = fr_bio_packet_write_buffer;
+ bio->write = fr_bio_queue_write_buffer;
- fr_assert(fr_bio_packet_list_num_elements(&my->free) > 0);
+ fr_assert(fr_bio_queue_list_num_elements(&my->free) > 0);
/*
* This can only error out if the free list has no more entries.
*/
- return fr_bio_packet_list_push(my, packet_ctx, buffer, size, (size_t) rcode);
+ return fr_bio_queue_list_push(my, packet_ctx, buffer, size, (size_t) rcode);
}
/** Flush the packet list.
*
*/
-static ssize_t fr_bio_packet_write_flush(fr_bio_packet_t *my, size_t size)
+static ssize_t fr_bio_queue_write_flush(fr_bio_queue_t *my, size_t size)
{
size_t written;
fr_bio_t *next;
- if (fr_bio_packet_list_num_elements(&my->pending) == 0) {
- my->bio.write = fr_bio_packet_write_next;
+ if (fr_bio_queue_list_num_elements(&my->pending) == 0) {
+ my->bio.write = fr_bio_queue_write_next;
return 0;
}
written = 0;
while (written < size) {
ssize_t rcode;
- fr_bio_packet_entry_t *item;
+ fr_bio_queue_entry_t *item;
/*
* No more saved packets to write: stop.
*/
- item = fr_bio_packet_list_head(&my->pending);
+ item = fr_bio_queue_list_head(&my->pending);
if (!item) break;
/*
if (my->sent) my->sent(&my->bio, item->packet_ctx, item->buffer, item->size);
}
- (void) fr_bio_packet_list_pop_head(&my->pending);
+ (void) fr_bio_queue_list_pop_head(&my->pending);
#ifndef NDEBUG
item->buffer = NULL;
item->packet_ctx = NULL;
#endif
item->cancelled = true;
- fr_bio_packet_list_insert_head(&my->free, item);
+ fr_bio_queue_list_insert_head(&my->free, item);
}
/*
* If we've written all of the saved packets, go back to writing to the "next" bio.
*/
- if (fr_bio_packet_list_head(&my->pending)) my->bio.write = fr_bio_packet_write_next;
+ if (fr_bio_queue_list_head(&my->pending)) my->bio.write = fr_bio_queue_write_next;
return written;
}
* The special buffer pointer of NULL means flush(). On flush, we call next->read(), and if that succeeds,
* go back to "pass through" mode for the buffers.
*/
-static ssize_t fr_bio_packet_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
+static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size)
{
- fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t);
+ fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
- if (!buffer) return fr_bio_packet_write_flush(my, size);
+ if (!buffer) return fr_bio_queue_write_flush(my, size);
/*
* This can only error out if the free list has no more entries.
*/
- return fr_bio_packet_list_push(my, packet_ctx, buffer, size, 0);
+ return fr_bio_queue_list_push(my, packet_ctx, buffer, size, 0);
}
/** Read one packet from next bio.
*
* The main
*/
-static ssize_t fr_bio_packet_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
+static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size)
{
int rcode;
- fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t);
+ fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
fr_bio_t *next;
next = fr_bio_next(&my->bio);
my->bio.read = fr_bio_eof_read;
my->bio.write = fr_bio_null_write;
- fr_bio_packet_list_cancel(my);
+ fr_bio_queue_list_cancel(my);
return rcode;
}
*
* Cancel / close has to be called before re-init.
*/
-static int fr_bio_packet_shutdown(fr_bio_t *bio)
+static int fr_bio_queue_shutdown(fr_bio_t *bio)
{
- fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t);
+ fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
- fr_bio_packet_list_cancel(my);
+ fr_bio_queue_list_cancel(my);
- my->bio.read = fr_bio_packet_read;
- my->bio.write = fr_bio_packet_write_next;
+ my->bio.read = fr_bio_queue_read;
+ my->bio.write = fr_bio_queue_write_next;
return 0;
}
*
* The read() API makes no provisions for reading complete packets. It simply returns whatever the next bio
* allows. If instead there is a need to read only complete packets, then the next bio should be
- * fr_bio_mem_packet_alloc().
+ * fr_bio_mem_alloc() with a fr_bio_mem_set_verify()
*
* The read() API may return 0. There may have been data read from an underlying FD, but that data did not
* make it through the filters of the "next" bios. e.g. Any underlying FD should be put into a "wait for
* - NULL on error, memory allocation failed
* - !NULL the bio
*/
-fr_bio_t *fr_bio_packet_alloc(TALLOC_CTX *ctx, size_t max_saved,
- fr_bio_packet_saved_t saved,
- fr_bio_packet_callback_t sent,
- fr_bio_packet_callback_t cancel,
+fr_bio_t *fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved,
+ fr_bio_queue_saved_t saved,
+ fr_bio_queue_callback_t sent,
+ fr_bio_queue_callback_t cancel,
fr_bio_t *next)
{
size_t i;
- fr_bio_packet_t *my;
+ fr_bio_queue_t *my;
if (!max_saved) max_saved = 1;
if (max_saved > (1 << 17)) max_saved = 1 << 17;
- my = (fr_bio_packet_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_packet_t) +
- sizeof(fr_bio_packet_entry_t) * max_saved);
+ my = (fr_bio_queue_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_queue_t) +
+ sizeof(fr_bio_queue_entry_t) * max_saved);
if (!my) return NULL;
- talloc_set_type(my, fr_bio_packet_t);
+ talloc_set_type(my, fr_bio_queue_t);
my->max_saved = max_saved;
- fr_bio_packet_list_init(&my->pending);
- fr_bio_packet_list_init(&my->free);
+ fr_bio_queue_list_init(&my->pending);
+ fr_bio_queue_list_init(&my->free);
my->saved = saved;
my->sent = sent;
for (i = 0; i < max_saved; i++) {
my->array[i].my = my;
my->array[i].cancelled = true;
- fr_bio_packet_list_insert_tail(&my->free, &my->array[i]);
+ fr_bio_queue_list_insert_tail(&my->free, &my->array[i]);
}
- my->bio.read = fr_bio_packet_read;
- my->bio.write = fr_bio_packet_write_next;
- my->cb.shutdown = fr_bio_packet_shutdown;
+ my->bio.read = fr_bio_queue_read;
+ my->bio.write = fr_bio_queue_write_next;
+ my->cb.shutdown = fr_bio_queue_shutdown;
fr_bio_chain(&my->bio, next);
if (my->cancel) {
- talloc_set_destructor(my, fr_bio_packet_destructor);
+ talloc_set_destructor(my, fr_bio_queue_destructor);
} else {
talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor);
}
* e.g. by closing the socket via fr_bio_fd_close(). That function will take care of walking back up the
* chain, and shutdownting each bio.
*
- * @param bio the #fr_bio_packet_t
- * @param ctx The context returned from #fr_bio_packet_saved_t
+ * @param bio the #fr_bio_queue_t
+ * @param ctx The context returned from #fr_bio_queue_saved_t
* @return
* - <0 no such packet was found in the list of saved packets, OR the packet cannot be cancelled.
* - 0 the packet was cancelled.
*/
-int fr_bio_packet_cancel(fr_bio_t *bio, void *ctx)
+int fr_bio_queue_cancel(fr_bio_t *bio, void *ctx)
{
- fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t);
- fr_bio_packet_entry_t *item = ctx;
+ fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t);
+ fr_bio_queue_entry_t *item = ctx;
if (!(item >= &my->array[0]) && (item < &my->array[my->max_saved])) {
return -1;
/*
* Remove it from the saved list, and run the cancellation callback.
*/
- (void) fr_bio_packet_list_remove(&my->pending, item);
- fr_bio_packet_list_insert_head(&my->free, item);
+ (void) fr_bio_queue_list_remove(&my->pending, item);
+ fr_bio_queue_list_insert_head(&my->free, item);
if (my->cancel) my->cancel(bio, item->packet_ctx, item->buffer, item->size);
return 0;