From: Alan T. DeKok Date: Thu, 22 Feb 2024 13:45:43 +0000 (-0500) Subject: move packet list to queue.c X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e92b166eca4e5c875ef25cc8ccd1e0477ccc84e1;p=thirdparty%2Ffreeradius-server.git move packet list to queue.c --- diff --git a/src/lib/bio/libfreeradius-bio.mk b/src/lib/bio/libfreeradius-bio.mk index 7e4473abef4..04d59ad533d 100644 --- a/src/lib/bio/libfreeradius-bio.mk +++ b/src/lib/bio/libfreeradius-bio.mk @@ -9,7 +9,7 @@ SOURCES := \ mem.c \ network.c \ null.c \ - packet.c \ - pipe.c + pipe.c \ + queue.c TGT_PREREQS := libfreeradius-util$(L) diff --git a/src/lib/bio/packet.c b/src/lib/bio/queue.c similarity index 71% rename from src/lib/bio/packet.c rename to src/lib/bio/queue.c index 5f1c996f52d..37b858b96eb 100644 --- a/src/lib/bio/packet.c +++ b/src/lib/bio/queue.c @@ -16,67 +16,67 @@ /** * $Id$ - * @file lib/bio/fd.c - * @brief Binary IO abstractions for packets in buffers + * @file lib/bio/queue.c + * @brief Binary IO abstractions for queues of raw packets * * @copyright 2024 Network RADIUS SAS (legal@networkradius.com) */ #include -#include +#include #include #include -typedef struct fr_bio_packet_entry_s fr_bio_packet_entry_t; -typedef struct fr_bio_packet_list_s fr_bio_packet_list_t; -typedef struct fr_bio_packet_s fr_bio_packet_t; +typedef struct fr_bio_queue_entry_s fr_bio_queue_entry_t; +typedef struct fr_bio_queue_list_s fr_bio_queue_list_t; +typedef struct fr_bio_queue_s fr_bio_queue_t; /* * Define type-safe wrappers for head and entry definitions. */ -FR_DLIST_TYPES(fr_bio_packet_list) +FR_DLIST_TYPES(fr_bio_queue_list) /* * For delayed writes. * * @todo - we can remove the "cancelled" field by setting packet_ctx == my? */ -struct fr_bio_packet_entry_s { +struct fr_bio_queue_entry_s { void *packet_ctx; void const *buffer; size_t size; size_t already_written; bool cancelled; - fr_bio_packet_t *my; + fr_bio_queue_t *my; - FR_DLIST_ENTRY(fr_bio_packet_list) entry; //!< List entry. + FR_DLIST_ENTRY(fr_bio_queue_list) entry; //!< List entry. }; -struct fr_bio_packet_list_s { - FR_DLIST_HEAD(fr_bio_packet_list) saved; - FR_DLIST_HEAD(fr_bio_packet_list) free; +struct fr_bio_queue_list_s { + FR_DLIST_HEAD(fr_bio_queue_list) saved; + FR_DLIST_HEAD(fr_bio_queue_list) free; }; -FR_DLIST_FUNCS(fr_bio_packet_list, fr_bio_packet_entry_t, entry) +FR_DLIST_FUNCS(fr_bio_queue_list, fr_bio_queue_entry_t, entry) -typedef struct fr_bio_packet_s { +typedef struct fr_bio_queue_s { FR_BIO_COMMON; size_t max_saved; - fr_bio_packet_saved_t saved; - fr_bio_packet_callback_t sent; - fr_bio_packet_callback_t cancel; + fr_bio_queue_saved_t saved; + fr_bio_queue_callback_t sent; + fr_bio_queue_callback_t cancel; - FR_DLIST_HEAD(fr_bio_packet_list) pending; - FR_DLIST_HEAD(fr_bio_packet_list) free; + FR_DLIST_HEAD(fr_bio_queue_list) pending; + FR_DLIST_HEAD(fr_bio_queue_list) free; - fr_bio_packet_entry_t array[]; -} fr_bio_packet_t; + fr_bio_queue_entry_t array[]; +} fr_bio_queue_t; -static ssize_t fr_bio_packet_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size); +static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size); /** Forcibly cancel all outstanding packets. * @@ -84,30 +84,30 @@ static ssize_t fr_bio_packet_write_buffer(fr_bio_t *bio, void *packet_ctx, void * shutdown(), when the destructor is called, or on fatal read / write * errors. */ -static void fr_bio_packet_list_cancel(fr_bio_packet_t *my) +static void fr_bio_queue_list_cancel(fr_bio_queue_t *my) { - fr_bio_packet_entry_t *item; + fr_bio_queue_entry_t *item; if (!my->cancel) return; - if (fr_bio_packet_list_num_elements(&my->pending) == 0) return; + if (fr_bio_queue_list_num_elements(&my->pending) == 0) return; /* * Cancel any remaining saved items. */ - while ((item = fr_bio_packet_list_pop_head(&my->pending)) != NULL) { + while ((item = fr_bio_queue_list_pop_head(&my->pending)) != NULL) { my->cancel(&my->bio, item->packet_ctx, item->buffer, item->size); item->cancelled = true; - fr_bio_packet_list_insert_head(&my->free, item); + fr_bio_queue_list_insert_head(&my->free, item); } } -static int fr_bio_packet_destructor(fr_bio_packet_t *my) +static int fr_bio_queue_destructor(fr_bio_queue_t *my) { fr_assert(my->cancel); /* otherwise it would be fr_bio_destructor */ my->bio.write = fr_bio_null_write; - fr_bio_packet_list_cancel(my); + fr_bio_queue_list_cancel(my); return 0; } @@ -115,11 +115,11 @@ static int fr_bio_packet_destructor(fr_bio_packet_t *my) /** Push a packet onto a list. * */ -static ssize_t fr_bio_packet_list_push(fr_bio_packet_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written) +static ssize_t fr_bio_queue_list_push(fr_bio_queue_t *my, void *packet_ctx, const void *buffer, size_t size, size_t already_written) { - fr_bio_packet_entry_t *item; + fr_bio_queue_entry_t *item; - item = fr_bio_packet_list_pop_head(&my->free); + item = fr_bio_queue_list_pop_head(&my->free); if (!item) return fr_bio_error(IO_WOULD_BLOCK); /* @@ -127,7 +127,7 @@ static ssize_t fr_bio_packet_list_push(fr_bio_packet_t *my, void *packet_ctx, co * * Otherwise, we're a subsequent entry, and we cannot have any data which is partially written. */ - fr_assert((fr_bio_packet_list_num_elements(&my->pending) == 0) || + fr_assert((fr_bio_queue_list_num_elements(&my->pending) == 0) || (already_written == 0)); item->packet_ctx = packet_ctx; @@ -136,7 +136,7 @@ static ssize_t fr_bio_packet_list_push(fr_bio_packet_t *my, void *packet_ctx, co item->already_written = already_written; item->cancelled = false; - fr_bio_packet_list_insert_tail(&my->pending, item); + fr_bio_queue_list_insert_tail(&my->pending, item); if (my->saved) my->saved(&my->bio, packet_ctx, buffer, size, item); @@ -147,16 +147,16 @@ static ssize_t fr_bio_packet_list_push(fr_bio_packet_t *my, void *packet_ctx, co * * If it blocks, save the packet and return OK to the caller. */ -static ssize_t fr_bio_packet_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size) +static ssize_t fr_bio_queue_write_next(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size) { ssize_t rcode; - fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t); + fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t); fr_bio_t *next; /* * We can't call the next bio if there's still cached data to flush. */ - fr_assert(fr_bio_packet_list_num_elements(&my->pending) == 0); + fr_assert(fr_bio_queue_list_num_elements(&my->pending) == 0); next = fr_bio_next(&my->bio); fr_assert(next != NULL); @@ -179,7 +179,7 @@ static ssize_t fr_bio_packet_write_next(fr_bio_t *bio, void *packet_ctx, void co my->bio.read = fr_bio_eof_read; my->bio.write = fr_bio_null_write; - fr_bio_packet_list_cancel(my); + fr_bio_queue_list_cancel(my); return rcode; } @@ -192,26 +192,26 @@ static ssize_t fr_bio_packet_write_next(fr_bio_t *bio, void *packet_ctx, void co * The next bio wrote a partial packet. Save the entire packet, and swap the write function to * save all future packets in the saved list. */ - bio->write = fr_bio_packet_write_buffer; + bio->write = fr_bio_queue_write_buffer; - fr_assert(fr_bio_packet_list_num_elements(&my->free) > 0); + fr_assert(fr_bio_queue_list_num_elements(&my->free) > 0); /* * This can only error out if the free list has no more entries. */ - return fr_bio_packet_list_push(my, packet_ctx, buffer, size, (size_t) rcode); + return fr_bio_queue_list_push(my, packet_ctx, buffer, size, (size_t) rcode); } /** Flush the packet list. * */ -static ssize_t fr_bio_packet_write_flush(fr_bio_packet_t *my, size_t size) +static ssize_t fr_bio_queue_write_flush(fr_bio_queue_t *my, size_t size) { size_t written; fr_bio_t *next; - if (fr_bio_packet_list_num_elements(&my->pending) == 0) { - my->bio.write = fr_bio_packet_write_next; + if (fr_bio_queue_list_num_elements(&my->pending) == 0) { + my->bio.write = fr_bio_queue_write_next; return 0; } @@ -224,12 +224,12 @@ static ssize_t fr_bio_packet_write_flush(fr_bio_packet_t *my, size_t size) written = 0; while (written < size) { ssize_t rcode; - fr_bio_packet_entry_t *item; + fr_bio_queue_entry_t *item; /* * No more saved packets to write: stop. */ - item = fr_bio_packet_list_head(&my->pending); + item = fr_bio_queue_list_head(&my->pending); if (!item) break; /* @@ -267,7 +267,7 @@ static ssize_t fr_bio_packet_write_flush(fr_bio_packet_t *my, size_t size) if (my->sent) my->sent(&my->bio, item->packet_ctx, item->buffer, item->size); } - (void) fr_bio_packet_list_pop_head(&my->pending); + (void) fr_bio_queue_list_pop_head(&my->pending); #ifndef NDEBUG item->buffer = NULL; item->packet_ctx = NULL; @@ -276,13 +276,13 @@ static ssize_t fr_bio_packet_write_flush(fr_bio_packet_t *my, size_t size) #endif item->cancelled = true; - fr_bio_packet_list_insert_head(&my->free, item); + fr_bio_queue_list_insert_head(&my->free, item); } /* * If we've written all of the saved packets, go back to writing to the "next" bio. */ - if (fr_bio_packet_list_head(&my->pending)) my->bio.write = fr_bio_packet_write_next; + if (fr_bio_queue_list_head(&my->pending)) my->bio.write = fr_bio_queue_write_next; return written; } @@ -292,16 +292,16 @@ static ssize_t fr_bio_packet_write_flush(fr_bio_packet_t *my, size_t size) * The special buffer pointer of NULL means flush(). On flush, we call next->read(), and if that succeeds, * go back to "pass through" mode for the buffers. */ -static ssize_t fr_bio_packet_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size) +static ssize_t fr_bio_queue_write_buffer(fr_bio_t *bio, void *packet_ctx, void const *buffer, size_t size) { - fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t); + fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t); - if (!buffer) return fr_bio_packet_write_flush(my, size); + if (!buffer) return fr_bio_queue_write_flush(my, size); /* * This can only error out if the free list has no more entries. */ - return fr_bio_packet_list_push(my, packet_ctx, buffer, size, 0); + return fr_bio_queue_list_push(my, packet_ctx, buffer, size, 0); } /** Read one packet from next bio. @@ -313,10 +313,10 @@ static ssize_t fr_bio_packet_write_buffer(fr_bio_t *bio, void *packet_ctx, void * * The main */ -static ssize_t fr_bio_packet_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size) +static ssize_t fr_bio_queue_read(fr_bio_t *bio, void *packet_ctx, void *buffer, size_t size) { int rcode; - fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t); + fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t); fr_bio_t *next; next = fr_bio_next(&my->bio); @@ -337,7 +337,7 @@ static ssize_t fr_bio_packet_read(fr_bio_t *bio, void *packet_ctx, void *buffer, my->bio.read = fr_bio_eof_read; my->bio.write = fr_bio_null_write; - fr_bio_packet_list_cancel(my); + fr_bio_queue_list_cancel(my); return rcode; } @@ -345,14 +345,14 @@ static ssize_t fr_bio_packet_read(fr_bio_t *bio, void *packet_ctx, void *buffer, * * Cancel / close has to be called before re-init. */ -static int fr_bio_packet_shutdown(fr_bio_t *bio) +static int fr_bio_queue_shutdown(fr_bio_t *bio) { - fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t); + fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t); - fr_bio_packet_list_cancel(my); + fr_bio_queue_list_cancel(my); - my->bio.read = fr_bio_packet_read; - my->bio.write = fr_bio_packet_write_next; + my->bio.read = fr_bio_queue_read; + my->bio.write = fr_bio_queue_write_next; return 0; } @@ -369,7 +369,7 @@ static int fr_bio_packet_shutdown(fr_bio_t *bio) * * The read() API makes no provisions for reading complete packets. It simply returns whatever the next bio * allows. If instead there is a need to read only complete packets, then the next bio should be - * fr_bio_mem_packet_alloc(). + * fr_bio_mem_alloc() with a fr_bio_mem_set_verify() * * The read() API may return 0. There may have been data read from an underlying FD, but that data did not * make it through the filters of the "next" bios. e.g. Any underlying FD should be put into a "wait for @@ -401,28 +401,28 @@ static int fr_bio_packet_shutdown(fr_bio_t *bio) * - NULL on error, memory allocation failed * - !NULL the bio */ -fr_bio_t *fr_bio_packet_alloc(TALLOC_CTX *ctx, size_t max_saved, - fr_bio_packet_saved_t saved, - fr_bio_packet_callback_t sent, - fr_bio_packet_callback_t cancel, +fr_bio_t *fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved, + fr_bio_queue_saved_t saved, + fr_bio_queue_callback_t sent, + fr_bio_queue_callback_t cancel, fr_bio_t *next) { size_t i; - fr_bio_packet_t *my; + fr_bio_queue_t *my; if (!max_saved) max_saved = 1; if (max_saved > (1 << 17)) max_saved = 1 << 17; - my = (fr_bio_packet_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_packet_t) + - sizeof(fr_bio_packet_entry_t) * max_saved); + my = (fr_bio_queue_t *) talloc_zero_array(ctx, uint8_t, sizeof(fr_bio_queue_t) + + sizeof(fr_bio_queue_entry_t) * max_saved); if (!my) return NULL; - talloc_set_type(my, fr_bio_packet_t); + talloc_set_type(my, fr_bio_queue_t); my->max_saved = max_saved; - fr_bio_packet_list_init(&my->pending); - fr_bio_packet_list_init(&my->free); + fr_bio_queue_list_init(&my->pending); + fr_bio_queue_list_init(&my->free); my->saved = saved; my->sent = sent; @@ -431,17 +431,17 @@ fr_bio_t *fr_bio_packet_alloc(TALLOC_CTX *ctx, size_t max_saved, for (i = 0; i < max_saved; i++) { my->array[i].my = my; my->array[i].cancelled = true; - fr_bio_packet_list_insert_tail(&my->free, &my->array[i]); + fr_bio_queue_list_insert_tail(&my->free, &my->array[i]); } - my->bio.read = fr_bio_packet_read; - my->bio.write = fr_bio_packet_write_next; - my->cb.shutdown = fr_bio_packet_shutdown; + my->bio.read = fr_bio_queue_read; + my->bio.write = fr_bio_queue_write_next; + my->cb.shutdown = fr_bio_queue_shutdown; fr_bio_chain(&my->bio, next); if (my->cancel) { - talloc_set_destructor(my, fr_bio_packet_destructor); + talloc_set_destructor(my, fr_bio_queue_destructor); } else { talloc_set_destructor((fr_bio_t *) my, fr_bio_destructor); } @@ -457,16 +457,16 @@ fr_bio_t *fr_bio_packet_alloc(TALLOC_CTX *ctx, size_t max_saved, * e.g. by closing the socket via fr_bio_fd_close(). That function will take care of walking back up the * chain, and shutdownting each bio. * - * @param bio the #fr_bio_packet_t - * @param ctx The context returned from #fr_bio_packet_saved_t + * @param bio the #fr_bio_queue_t + * @param ctx The context returned from #fr_bio_queue_saved_t * @return * - <0 no such packet was found in the list of saved packets, OR the packet cannot be cancelled. * - 0 the packet was cancelled. */ -int fr_bio_packet_cancel(fr_bio_t *bio, void *ctx) +int fr_bio_queue_cancel(fr_bio_t *bio, void *ctx) { - fr_bio_packet_t *my = talloc_get_type_abort(bio, fr_bio_packet_t); - fr_bio_packet_entry_t *item = ctx; + fr_bio_queue_t *my = talloc_get_type_abort(bio, fr_bio_queue_t); + fr_bio_queue_entry_t *item = ctx; if (!(item >= &my->array[0]) && (item < &my->array[my->max_saved])) { return -1; @@ -509,8 +509,8 @@ int fr_bio_packet_cancel(fr_bio_t *bio, void *ctx) /* * Remove it from the saved list, and run the cancellation callback. */ - (void) fr_bio_packet_list_remove(&my->pending, item); - fr_bio_packet_list_insert_head(&my->free, item); + (void) fr_bio_queue_list_remove(&my->pending, item); + fr_bio_queue_list_insert_head(&my->free, item); if (my->cancel) my->cancel(bio, item->packet_ctx, item->buffer, item->size); return 0; diff --git a/src/lib/bio/packet.h b/src/lib/bio/queue.h similarity index 55% rename from src/lib/bio/packet.h rename to src/lib/bio/queue.h index c14172a9e3e..ca7e914c41d 100644 --- a/src/lib/bio/packet.h +++ b/src/lib/bio/queue.h @@ -17,26 +17,23 @@ /** * $Id$ - * @file lib/bio/packet.h - * @brief Binary IO abstractions for packets in buffers + * @file lib/bio/queue.h + * @brief Binary IO abstractions for queues of raw packets * * Write packets of data to bios. If a packet is partially * read/written, it is cached for later processing. * - * @todo - Not quite done yet. It still needs to be integrated into the bio framework, - * and be managed through a bio of its own. - * * @copyright 2024 Network RADIUS SAS (legal@networkradius.com) */ -RCSIDH(lib_bio_packet_h, "$Id$") +RCSIDH(lib_bio_queue_h, "$Id$") -typedef void (*fr_bio_packet_callback_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size); -typedef void (*fr_bio_packet_saved_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, void *ctx); +typedef void (*fr_bio_queue_callback_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size); +typedef void (*fr_bio_queue_saved_t)(fr_bio_t *bio, void *packet_ctx, const void *buffer, size_t size, void *ctx); -fr_bio_t *fr_bio_packet_alloc(TALLOC_CTX *ctx, size_t max_saved, - fr_bio_packet_saved_t saved, - fr_bio_packet_callback_t sent, - fr_bio_packet_callback_t cancel, - fr_bio_t *next) CC_HINT(nonnull(1,6)); +fr_bio_t *fr_bio_queue_alloc(TALLOC_CTX *ctx, size_t max_saved, + fr_bio_queue_saved_t saved, + fr_bio_queue_callback_t sent, + fr_bio_queue_callback_t cancel, + fr_bio_t *next) CC_HINT(nonnull(1,6)); -int fr_bio_packet_cancel(fr_bio_t *bio, void *ctx) CC_HINT(nonnull); +int fr_bio_queue_cancel(fr_bio_t *bio, void *ctx) CC_HINT(nonnull);